From 09ae82506cf5afa41b34a32e4667a372e322df0a Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Thu, 6 Oct 2022 20:17:05 +0530 Subject: [PATCH] [YUNIKORN-1328] Handle application state changes and trigger tracker interfaces --- pkg/scheduler/objects/application.go | 26 +++++++++++++++++++ pkg/scheduler/objects/application_state.go | 25 +++++++++--------- pkg/scheduler/partition.go | 4 +++ pkg/scheduler/ugm/manager.go | 30 ++++++++++++++++------ pkg/scheduler/ugm/manager_test.go | 28 ++++++++++---------- pkg/scheduler/ugm/user_tracker.go | 2 +- pkg/scheduler/ugm/user_tracker_test.go | 4 +-- 7 files changed, 82 insertions(+), 37 deletions(-) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index adbab4ffe..5812132b3 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -36,6 +36,7 @@ import ( "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/metrics" "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" + "github.com/apache/yunikorn-core/pkg/scheduler/ugm" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -1420,8 +1421,11 @@ func (sa *Application) addAllocationInternal(info *Allocation) { if resources.IsZero(sa.allocatedPlaceholder) { sa.initPlaceholderTimer() } + // User resource usage needs to be updated even during resource allocation happen for ph's itself even though state change would happen only after all ph allocation completes. + sa.incUserResourceUsage(info.GetAllocatedResource()) sa.allocatedPlaceholder = resources.Add(sa.allocatedPlaceholder, info.GetAllocatedResource()) sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedPlaceholder, sa.maxAllocatedResource) + // If there are no more placeholder to allocate we should move state if resources.Equals(sa.allocatedPlaceholder, sa.placeholderAsk) { if err := sa.HandleApplicationEvent(RunApplication); err != nil { @@ -1442,12 +1446,33 @@ func (sa *Application) addAllocationInternal(info *Allocation) { zap.Error(err)) } } + sa.incUserResourceUsage(info.GetAllocatedResource()) sa.allocatedResource = resources.Add(sa.allocatedResource, info.GetAllocatedResource()) sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource) } sa.allocations[info.GetUUID()] = info } +func (sa *Application) incUserResourceUsage(resource *resources.Resource) { + if err := ugm.GetUserManager().IncreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser()); err != nil { + log.Logger().Error("Unable to track the user resource usage", + zap.String("application id", sa.ApplicationID), + zap.String("user", sa.GetUser().User), + zap.String("currentState", sa.stateMachine.Current()), + zap.Error(err)) + } +} + +func (sa *Application) decUserResourceUsage(resource *resources.Resource, removeApp bool) { + if err := ugm.GetUserManager().DecreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser(), removeApp); err != nil { + log.Logger().Error("Unable to track the user resource usage", + zap.String("application id", sa.ApplicationID), + zap.String("user", sa.GetUser().User), + zap.String("currentState", sa.stateMachine.Current()), + zap.Error(err)) + } +} + func (sa *Application) ReplaceAllocation(uuid string) *Allocation { sa.Lock() defer sa.Unlock() @@ -1523,6 +1548,7 @@ func (sa *Application) removeAllocationInternal(uuid string) *Allocation { } } } else { + sa.decUserResourceUsage(alloc.GetAllocatedResource(), false) sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource()) // When the resource trackers are zero we should not expect anything to come in later. if sa.hasZeroAllocations() { diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index 4447847a0..cd03f6d6a 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -22,12 +22,11 @@ import ( "fmt" "time" + "github.com/looplab/fsm" "go.uber.org/zap" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/metrics" - - "github.com/looplab/fsm" ) const noTransition = "no transition" @@ -148,10 +147,21 @@ func NewAppState() *fsm.FSM { fmt.Sprintf("enter_%s", Starting.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck app.setStateTimer(app.startTimeout, app.stateMachine.Current(), RunApplication) + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning() + metrics.GetSchedulerMetrics().IncTotalApplicationsRunning() + }, + fmt.Sprintf("enter_%s", Resuming.String()): func(event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + app.decUserResourceUsage(app.GetAllocatedResource(), true) + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() + metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() }, fmt.Sprintf("enter_%s", Completing.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication) + app.decUserResourceUsage(app.GetAllocatedResource(), true) + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() + metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() }, fmt.Sprintf("leave_%s", New.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck @@ -169,16 +179,6 @@ func NewAppState() *fsm.FSM { app.rejectedMessage = event.Args[1].(string) //nolint:errcheck } }, - fmt.Sprintf("enter_%s", Running.String()): func(event *fsm.Event) { - app := event.Args[0].(*Application) //nolint:errcheck - metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning() - metrics.GetSchedulerMetrics().IncTotalApplicationsRunning() - }, - fmt.Sprintf("leave_%s", Running.String()): func(event *fsm.Event) { - app := event.Args[0].(*Application) //nolint:errcheck - metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() - metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() - }, fmt.Sprintf("enter_%s", Completed.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted() @@ -189,6 +189,7 @@ func NewAppState() *fsm.FSM { }, fmt.Sprintf("enter_%s", Failed.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck + app.decUserResourceUsage(app.GetAllocatedResource(), true) metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed() metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index e1e5300d3..9fbf5eaa3 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -38,6 +38,7 @@ import ( "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-core/pkg/scheduler/placement" "github.com/apache/yunikorn-core/pkg/scheduler/policies" + "github.com/apache/yunikorn-core/pkg/scheduler/ugm" "github.com/apache/yunikorn-core/pkg/webservice/dao" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -63,6 +64,7 @@ type PartitionContext struct { totalPartitionResource *resources.Resource // Total node resources allocations int // Number of allocations on the partition stateDumpFilePath string // Path of output file for state dumps + ugm *ugm.Manager // User group manager // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -95,6 +97,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC nodes: objects.NewNodeCollection(conf.Name), } pc.partitionManager = newPartitionManager(pc, cc) + ugm.Init() + pc.ugm = ugm.GetUserManager() if err := pc.initialPartitionFromConfig(conf); err != nil { return nil, err } diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go index ba0a576ea..250690679 100644 --- a/pkg/scheduler/ugm/manager.go +++ b/pkg/scheduler/ugm/manager.go @@ -29,12 +29,21 @@ import ( "github.com/apache/yunikorn-core/pkg/log" ) +var once sync.Once +var m *Manager + type Manager struct { userTrackers map[string]*UserTracker groupTrackers map[string]*GroupTracker sync.RWMutex } +func Init() { + once.Do(func() { + m = NewManager() + }) +} + func NewManager() *Manager { manager := &Manager{ userTrackers: make(map[string]*UserTracker), @@ -42,8 +51,13 @@ func NewManager() *Manager { } return manager } -func (m *Manager) IncreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user *security.UserGroup) error { - if queuePath == "" || applicationID == "" || usage == nil || user == nil { + +func GetUserManager() *Manager { + return m +} + +func (m *Manager) IncreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user security.UserGroup) error { + if queuePath == "" || applicationID == "" || usage == nil || user.User == "" { return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s, user: %s", queuePath, applicationID, usage.String(), user.User) } @@ -71,8 +85,8 @@ func (m *Manager) IncreaseTrackedResource(queuePath string, applicationID string return nil } -func (m *Manager) DecreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user *security.UserGroup, removeApp bool) error { - if queuePath == "" || applicationID == "" || usage == nil || user == nil { +func (m *Manager) DecreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user security.UserGroup, removeApp bool) error { + if queuePath == "" || applicationID == "" || usage == nil || user.User == "" { return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s, user: %s", queuePath, applicationID, usage.String(), user.User) } @@ -114,7 +128,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath string, applicationID string return nil } -func (m *Manager) ensureGroupTrackerForApp(applicationID string, user *security.UserGroup) { +func (m *Manager) ensureGroupTrackerForApp(applicationID string, user security.UserGroup) { userTracker := m.userTrackers[user.User] if !userTracker.hasGroupForApp(applicationID) { var groupTracker *GroupTracker @@ -131,7 +145,7 @@ func (m *Manager) ensureGroupTrackerForApp(applicationID string, user *security. // getGroup Based on the current limitations, group name and username is same. hence, using username as group name. // It would be changed in future based on user group resolution, limit configuration processing etc -func (m *Manager) getGroup(user *security.UserGroup) string { +func (m *Manager) getGroup(user security.UserGroup) string { return user.User } @@ -167,10 +181,10 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker) bool { } // only for tests -func (m *Manager) getUserTrackers() map[string]*UserTracker { +func (m *Manager) GetUserTrackers() map[string]*UserTracker { return m.userTrackers } -func (m *Manager) getGroupTrackers() map[string]*GroupTracker { +func (m *Manager) GetGroupTrackers() map[string]*GroupTracker { return m.groupTrackers } diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go index caa0afa37..7b9978f5e 100644 --- a/pkg/scheduler/ugm/manager_test.go +++ b/pkg/scheduler/ugm/manager_test.go @@ -31,7 +31,7 @@ func TestAddRemoveUserAndGroups(t *testing.T) { // Queue setup: // root->parent->child1 // root->parent->child2 - user := &security.UserGroup{User: "test"} + user := security.UserGroup{User: "test"} usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"}) if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1) @@ -42,17 +42,17 @@ func TestAddRemoveUserAndGroups(t *testing.T) { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err) } - assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1") - assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1") err = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user) if err != nil { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err) } - assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1") - assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1") - user1 := &security.UserGroup{User: "test1"} + user1 := security.UserGroup{User: "test1"} usage2, err := resources.NewResourceFromConf(map[string]string{"mem": "20M", "vcore": "20"}) if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2) @@ -61,8 +61,8 @@ func TestAddRemoveUserAndGroups(t *testing.T) { if err != nil { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err) } - assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2") - assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2") usage3, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"}) if err != nil { @@ -72,20 +72,20 @@ func TestAddRemoveUserAndGroups(t *testing.T) { if err != nil { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err) } - assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2") - assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2") err = manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user, true) if err != nil { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err) } - assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1") - assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1") err = manager.DecreaseTrackedResource(queuePath2, TestApp2, usage2, user1, true) if err != nil { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err) } - assert.Equal(t, 0, len(manager.getUserTrackers()), "userTrackers count should be 0") - assert.Equal(t, 0, len(manager.getGroupTrackers()), "groupTrackers count should be 0") + assert.Equal(t, 0, len(manager.GetUserTrackers()), "userTrackers count should be 0") + assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0") } diff --git a/pkg/scheduler/ugm/user_tracker.go b/pkg/scheduler/ugm/user_tracker.go index dcab7a09e..4b2792db8 100644 --- a/pkg/scheduler/ugm/user_tracker.go +++ b/pkg/scheduler/ugm/user_tracker.go @@ -33,7 +33,7 @@ type UserTracker struct { sync.RWMutex } -func NewUserTracker(user *security.UserGroup) *UserTracker { +func NewUserTracker(user security.UserGroup) *UserTracker { queueTracker := NewQueueTracker("root") userTracker := &UserTracker{ userName: user.User, diff --git a/pkg/scheduler/ugm/user_tracker_test.go b/pkg/scheduler/ugm/user_tracker_test.go index 29af20f53..21590f79a 100644 --- a/pkg/scheduler/ugm/user_tracker_test.go +++ b/pkg/scheduler/ugm/user_tracker_test.go @@ -44,7 +44,7 @@ func TestIncreaseTrackedResource(t *testing.T) { // root->parent->child1->child12 // root->parent->child2 // root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed) - user := &security.UserGroup{User: "test"} + user := security.UserGroup{User: "test"} userTracker := NewUserTracker(user) usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "10M", "vcore": "10"}) if err != nil { @@ -104,7 +104,7 @@ func TestDecreaseTrackedResource(t *testing.T) { // Queue setup: // root->parent->child1 // root->parent->child2 - user := &security.UserGroup{User: "test"} + user := security.UserGroup{User: "test"} userTracker := NewUserTracker(user) usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "70M", "vcore": "70"})