Skip to content

Commit

Permalink
[YUNIKORN-478]Handle app completion at the shim side (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangTing-Yao authored Dec 23, 2020
1 parent fc51dbc commit 2432d4d
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 23 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ go 1.12

require (
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200817155620-c19d2b8660d8
github.com/apache/incubator-yunikorn-core v0.0.0-20201210085111-01f5288eb593
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319
github.com/apache/incubator-yunikorn-core v0.0.0-20201218082332-5471d84cd619
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20201215141356-df4d86d2197b
github.com/google/uuid v1.1.1
github.com/looplab/fsm v0.1.0
github.com/onsi/ginkgo v1.11.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/incubator-yunikorn-core v0.0.0-20201210085111-01f5288eb593 h1:iwGpaNP9WYBGsRSjHSgwW6QVsmK69z6KL/VGUa+sZrI=
github.com/apache/incubator-yunikorn-core v0.0.0-20201210085111-01f5288eb593/go.mod h1:SDGJNdgVBEfSvbEvP+kci46HzBaFXNLeHcqFQN4zur0=
github.com/apache/incubator-yunikorn-core v0.0.0-20201218082332-5471d84cd619 h1:OTtIjML3c9XijNkufiT5AWquCh2TXgtqVjQKIxMqWsY=
github.com/apache/incubator-yunikorn-core v0.0.0-20201218082332-5471d84cd619/go.mod h1:d/fn47kdGd094NHiysHAY+0d4evpYRFQ6q5b9eaBONo=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 h1:I12nCcXdHe6W4oysVejFHfQDy0Ix0r+ZHdfooyEoldo=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20201215141356-df4d86d2197b h1:dxncLtkTwtqUB6pgsVVw52+Ni3ZXG2BPnLrSrubcRZA=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20201215141356-df4d86d2197b/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
Expand Down
35 changes: 30 additions & 5 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched
{Name: string(events.RunApplication),
Src: []string{states.Accepted, states.Running},
Dst: states.Running},
{Name: string(events.ReleaseAppAllocation),
Src: []string{states.Running},
Dst: states.Running},
{Name: string(events.CompleteApplication),
Src: []string{states.Running},
Dst: states.Completed},
Expand All @@ -101,11 +104,12 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched
Dst: states.Killed},
},
fsm.Callbacks{
string(events.SubmitApplication): app.handleSubmitApplicationEvent,
string(events.RecoverApplication): app.handleRecoverApplicationEvent,
string(events.RejectApplication): app.handleRejectApplicationEvent,
string(events.CompleteApplication): app.handleCompleteApplicationEvent,
events.EnterState: app.enterState,
string(events.SubmitApplication): app.handleSubmitApplicationEvent,
string(events.RecoverApplication): app.handleRecoverApplicationEvent,
string(events.RejectApplication): app.handleRejectApplicationEvent,
string(events.CompleteApplication): app.handleCompleteApplicationEvent,
string(events.ReleaseAppAllocation): app.handleReleaseAppAllocationEvent,
events.EnterState: app.enterState,
},
)

Expand Down Expand Up @@ -369,6 +373,27 @@ func (app *Application) handleCompleteApplicationEvent(event *fsm.Event) {
// TODO app lifecycle updates
}

func (app *Application) handleReleaseAppAllocationEvent(event *fsm.Event) {
eventArgs := make([]string, 2)
if err := events.GetEventArgsAsStrings(eventArgs, event.Args); err != nil {
log.Logger().Error("fail to paser event arg", zap.Error(err))
return
}
allocUUID := eventArgs[0]
log.Logger().Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("allocationUUID", allocUUID))

for _, task := range app.taskMap {
if task.allocationUUID == allocUUID {
err := task.DeleteTaskPod(task.pod)
if err != nil {
log.Logger().Error("failed to release allocation from application", zap.Error(err))
}
}
}
}

func (app *Application) enterState(event *fsm.Event) {
log.Logger().Debug("shim app state transition",
zap.String("app", app.applicationID),
Expand Down
39 changes: 38 additions & 1 deletion pkg/cache/application_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package cache

import "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
import (
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
)

// ------------------------
// SimpleApplicationEvent simples moves application states
Expand Down Expand Up @@ -160,3 +163,37 @@ func (fe FailApplicationEvent) GetArgs() []interface{} {
func (fe FailApplicationEvent) GetApplicationID() string {
return fe.applicationID
}

// ------------------------
// Release application allocations
// ------------------------
type ReleaseAppAllocationEvent struct {
applicationID string
allocationUUID string
terminationType string
event events.ApplicationEventType
}

func NewReleaseAppAllocationEvent(appID string, allocTermination si.AllocationRelease_TerminationType, uuid string) ReleaseAppAllocationEvent {
return ReleaseAppAllocationEvent{
applicationID: appID,
allocationUUID: uuid,
terminationType: si.AllocationRelease_TerminationType_name[int32(allocTermination)],
event: events.ReleaseAppAllocation,
}
}

func (re ReleaseAppAllocationEvent) GetApplicationID() string {
return re.applicationID
}

func (re ReleaseAppAllocationEvent) GetArgs() []interface{} {
args := make([]interface{}, 2)
args[0] = re.allocationUUID
args[1] = re.terminationType
return args
}

func (re ReleaseAppAllocationEvent) GetEvent() events.ApplicationEventType {
return re.event
}
46 changes: 46 additions & 0 deletions pkg/cache/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/incubator-yunikorn-core/pkg/api"
Expand Down Expand Up @@ -92,6 +93,51 @@ func TestRunApplication(t *testing.T) {
assertAppState(t, app, events.States().Application.Submitted, 3*time.Second)
}

func TestReleaseAppAllocation(t *testing.T) {
context := initContextForTest()
ms := &mockSchedulerAPI{}
resources := make(map[v1.ResourceName]resource.Quantity)
containers := make([]v1.Container, 0)
containers = append(containers, v1.Container{
Name: "container-01",
Resources: v1.ResourceRequirements{
Requests: resources,
},
})
pod := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-test-00001",
UID: "UID-00001",
},
Spec: v1.PodSpec{
Containers: containers,
},
}
appID := "app-test-001"
UUID := "testUUID001"
app := NewApplication(appID, "root.abc", "testuser", map[string]string{}, ms)
task := NewTask("task01", app, context, pod)
app.addTask(task)
task.allocationUUID = UUID
// app must be running states
err := app.handle(NewReleaseAppAllocationEvent(appID, si.AllocationRelease_TIMEOUT, UUID))
if err == nil {
// this should give an error
t.Error("expecting error got 'nil'")
}
// set app states to running, let event can be trigger
app.SetState(events.States().Application.Running)
assertAppState(t, app, events.States().Application.Running, 3*time.Second)
err = app.handle(NewReleaseAppAllocationEvent(appID, si.AllocationRelease_TIMEOUT, UUID))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, events.States().Application.Running, 3*time.Second)
}

func newMockSchedulerAPI() *mockSchedulerAPI {
return &mockSchedulerAPI{
registerFn: func(request *si.RegisterResourceManagerRequest, callback api.ResourceManagerCallback) (response *si.RegisterResourceManagerResponse, e error) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,16 @@ func (ctx *Context) RemoveApplication(appID string) error {
}
}

func (ctx *Context) RemoveApplicationInternal(appID string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if _, exist := ctx.applications[appID]; exist {
delete(ctx.applications, appID)
return nil
}
return fmt.Errorf("application %s is not found in the context", appID)
}

// this implements ApplicationManagementProtocol
func (ctx *Context) AddTask(request *interfaces.AddTaskRequest) interfaces.ManagedTask {
log.Logger().Debug("AddTask",
Expand Down
27 changes: 27 additions & 0 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,33 @@ func TestRemoveApplication(t *testing.T) {
assert.Assert(t, app != nil)
}

func TestRemoveApplicationInternal(t *testing.T) {
context := initContextForTest()
appID1 := "app00001"
appID2 := "app00002"
app1 := NewApplication(appID1, "root.a", "testuser", map[string]string{}, newMockSchedulerAPI())
app2 := NewApplication(appID2, "root.b", "testuser", map[string]string{}, newMockSchedulerAPI())
context.applications[appID1] = app1
context.applications[appID2] = app2
assert.Equal(t, len(context.applications), 2)
// remove non-exist app
err := context.RemoveApplicationInternal("app00003")
assert.Assert(t, err != nil)
assert.Equal(t, len(context.applications), 2)
// remove app1
err = context.RemoveApplicationInternal(appID1)
assert.NilError(t, err)
assert.Equal(t, len(context.applications), 1)
_, ok := context.applications[appID1]
assert.Equal(t, ok, false)
// remove app2
err = context.RemoveApplicationInternal(appID2)
assert.NilError(t, err)
assert.Equal(t, len(context.applications), 0)
_, ok = context.applications[appID2]
assert.Equal(t, ok, false)
}

func TestAddTask(t *testing.T) {
context := initContextForTest()

Expand Down
4 changes: 4 additions & 0 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func (task *Task) getTaskAllocationUUID() string {
return task.allocationUUID
}

func (task *Task) DeleteTaskPod(pod *v1.Pod) error {
return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.pod)
}

func (task *Task) isTerminated() bool {
for _, states := range events.States().Task.Terminated {
if task.GetTaskState() == states {
Expand Down
20 changes: 17 additions & 3 deletions pkg/callback/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,30 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse)
for _, release := range response.ReleasedAllocations {
log.Logger().Debug("callback: response to released allocations",
zap.String("UUID", release.UUID))

// TerminationType 0 mean STOPPED_BY_RM
if release.TerminationType != si.AllocationRelease_STOPPED_BY_RM {
// send release app allocation to application states machine
ev := cache.NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.UUID)
dispatcher.Dispatch(ev)
}
}

// handle status changes
for _, updated := range response.UpdatedApplications {
log.Logger().Debug("status update callback received",
zap.String("appId", updated.ApplicationID),
zap.String("new status", updated.State))

//handle status update
dispatcher.Dispatch(cache.NewApplicationStatusChangeEvent(updated.ApplicationID, events.AppStateChange, updated.State))
// delete application from context
if updated.State == events.States().Application.Completed {
err := callback.context.RemoveApplicationInternal(updated.ApplicationID)
if err != nil {
log.Logger().Error("failed to delete application", zap.Error(err))
}
} else {
// handle status update
dispatcher.Dispatch(cache.NewApplicationStatusChangeEvent(updated.ApplicationID, events.AppStateChange, updated.State))
}
}

return nil
Expand Down
21 changes: 11 additions & 10 deletions pkg/common/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ type SchedulingEvent interface {
type ApplicationEventType string

const (
SubmitApplication ApplicationEventType = "SubmitApplication"
RecoverApplication ApplicationEventType = "RecoverApplication"
AcceptApplication ApplicationEventType = "AcceptApplication"
RunApplication ApplicationEventType = "RunApplication"
RejectApplication ApplicationEventType = "RejectApplication"
CompleteApplication ApplicationEventType = "CompleteApplication"
FailApplication ApplicationEventType = "FailApplication"
KillApplication ApplicationEventType = "KillApplication"
KilledApplication ApplicationEventType = "KilledApplication"
AppStateChange ApplicationEventType = "ApplicationStateChange"
SubmitApplication ApplicationEventType = "SubmitApplication"
RecoverApplication ApplicationEventType = "RecoverApplication"
AcceptApplication ApplicationEventType = "AcceptApplication"
RunApplication ApplicationEventType = "RunApplication"
RejectApplication ApplicationEventType = "RejectApplication"
CompleteApplication ApplicationEventType = "CompleteApplication"
FailApplication ApplicationEventType = "FailApplication"
KillApplication ApplicationEventType = "KillApplication"
KilledApplication ApplicationEventType = "KilledApplication"
ReleaseAppAllocation ApplicationEventType = "ReleaseAppAllocation"
AppStateChange ApplicationEventType = "ApplicationStateChange"
)

type ApplicationEvent interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/si_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRe
}

func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition string) si.UpdateRequest {
toReleases := make([]*si.AllocationReleaseRequest, 0)
toReleases = append(toReleases, &si.AllocationReleaseRequest{
toReleases := make([]*si.AllocationRelease, 0)
toReleases = append(toReleases, &si.AllocationRelease{
ApplicationID: appID,
UUID: allocUUID,
PartitionName: partition,
Expand Down

0 comments on commit 2432d4d

Please sign in to comment.