diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 905e903b8..e37c4a9cb 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -36,6 +36,7 @@ import ( "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/metrics" "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" + "github.com/apache/yunikorn-core/pkg/scheduler/ugm" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -1420,8 +1421,11 @@ func (sa *Application) addAllocationInternal(info *Allocation) { if resources.IsZero(sa.allocatedPlaceholder) { sa.initPlaceholderTimer() } + // User resource usage needs to be updated even during resource allocation happen for ph's itself even though state change would happen only after all ph allocation completes. + sa.incUserResourceUsage(info.GetAllocatedResource()) sa.allocatedPlaceholder = resources.Add(sa.allocatedPlaceholder, info.GetAllocatedResource()) sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedPlaceholder, sa.maxAllocatedResource) + // If there are no more placeholder to allocate we should move state if resources.Equals(sa.allocatedPlaceholder, sa.placeholderAsk) { if err := sa.HandleApplicationEvent(RunApplication); err != nil { @@ -1442,12 +1446,33 @@ func (sa *Application) addAllocationInternal(info *Allocation) { zap.Error(err)) } } + sa.incUserResourceUsage(info.GetAllocatedResource()) sa.allocatedResource = resources.Add(sa.allocatedResource, info.GetAllocatedResource()) sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource) } sa.allocations[info.GetUUID()] = info } +func (sa *Application) incUserResourceUsage(resource *resources.Resource) { + if err := ugm.GetUserManager().IncreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser()); err != nil { + log.Logger().Error("Unable to track the user resource usage", + zap.String("application id", sa.ApplicationID), + zap.String("user", sa.GetUser().User), + zap.String("currentState", sa.stateMachine.Current()), + zap.Error(err)) + } +} + +func (sa *Application) decUserResourceUsage(resource *resources.Resource, removeApp bool) { + if err := ugm.GetUserManager().DecreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser(), removeApp); err != nil { + log.Logger().Error("Unable to track the user resource usage", + zap.String("application id", sa.ApplicationID), + zap.String("user", sa.GetUser().User), + zap.String("currentState", sa.stateMachine.Current()), + zap.Error(err)) + } +} + func (sa *Application) ReplaceAllocation(uuid string) *Allocation { sa.Lock() defer sa.Unlock() @@ -1529,6 +1554,7 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term } } } else { + sa.decUserResourceUsage(alloc.GetAllocatedResource(), false) sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource()) // When the resource trackers are zero we should not expect anything to come in later. if sa.hasZeroAllocations() { diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index 4447847a0..cd03f6d6a 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -22,12 +22,11 @@ import ( "fmt" "time" + "github.com/looplab/fsm" "go.uber.org/zap" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/metrics" - - "github.com/looplab/fsm" ) const noTransition = "no transition" @@ -148,10 +147,21 @@ func NewAppState() *fsm.FSM { fmt.Sprintf("enter_%s", Starting.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck app.setStateTimer(app.startTimeout, app.stateMachine.Current(), RunApplication) + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning() + metrics.GetSchedulerMetrics().IncTotalApplicationsRunning() + }, + fmt.Sprintf("enter_%s", Resuming.String()): func(event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + app.decUserResourceUsage(app.GetAllocatedResource(), true) + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() + metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() }, fmt.Sprintf("enter_%s", Completing.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication) + app.decUserResourceUsage(app.GetAllocatedResource(), true) + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() + metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() }, fmt.Sprintf("leave_%s", New.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck @@ -169,16 +179,6 @@ func NewAppState() *fsm.FSM { app.rejectedMessage = event.Args[1].(string) //nolint:errcheck } }, - fmt.Sprintf("enter_%s", Running.String()): func(event *fsm.Event) { - app := event.Args[0].(*Application) //nolint:errcheck - metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning() - metrics.GetSchedulerMetrics().IncTotalApplicationsRunning() - }, - fmt.Sprintf("leave_%s", Running.String()): func(event *fsm.Event) { - app := event.Args[0].(*Application) //nolint:errcheck - metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() - metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() - }, fmt.Sprintf("enter_%s", Completed.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted() @@ -189,6 +189,7 @@ func NewAppState() *fsm.FSM { }, fmt.Sprintf("enter_%s", Failed.String()): func(event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck + app.decUserResourceUsage(app.GetAllocatedResource(), true) metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning() metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed() metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 55a9af1ff..5d501f3e8 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -38,6 +38,7 @@ import ( "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-core/pkg/scheduler/placement" "github.com/apache/yunikorn-core/pkg/scheduler/policies" + "github.com/apache/yunikorn-core/pkg/scheduler/ugm" "github.com/apache/yunikorn-core/pkg/webservice/dao" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -63,6 +64,7 @@ type PartitionContext struct { totalPartitionResource *resources.Resource // Total node resources allocations int // Number of allocations on the partition stateDumpFilePath string // Path of output file for state dumps + ugm *ugm.Manager // User group manager // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -95,6 +97,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC nodes: objects.NewNodeCollection(conf.Name), } pc.partitionManager = newPartitionManager(pc, cc) + ugm.Init() + pc.ugm = ugm.GetUserManager() if err := pc.initialPartitionFromConfig(conf); err != nil { return nil, err } diff --git a/pkg/scheduler/ugm/group_tracker_test.go b/pkg/scheduler/ugm/group_tracker_test.go index 07554a751..d4ecdb70a 100644 --- a/pkg/scheduler/ugm/group_tracker_test.go +++ b/pkg/scheduler/ugm/group_tracker_test.go @@ -34,6 +34,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) { // root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed) user := &security.UserGroup{User: "test", Groups: []string{"test"}} groupTracker := newGroupTracker(user.User) + usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "10M", "vcore": "10"}) if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1) diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go index fe3e0b431..e6c9410f9 100644 --- a/pkg/scheduler/ugm/manager.go +++ b/pkg/scheduler/ugm/manager.go @@ -279,10 +279,10 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker) bool { } // getUserTrackers only for tests -func (m *Manager) getUserTrackers() map[string]*UserTracker { +func (m *Manager) GetUserTrackers() map[string]*UserTracker { return m.userTrackers } -func (m *Manager) getGroupTrackers() map[string]*GroupTracker { +func (m *Manager) GetGroupTrackers() map[string]*GroupTracker { return m.groupTrackers } diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go index 300c8c09a..eafc8bff6 100644 --- a/pkg/scheduler/ugm/manager_test.go +++ b/pkg/scheduler/ugm/manager_test.go @@ -65,22 +65,22 @@ func TestAddRemoveUserAndGroups(t *testing.T) { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err) } - userTrackers := manager.getUserTrackers() + userTrackers := manager.GetUserTrackers() userTracker := userTrackers["test"] - groupTrackers := manager.getGroupTrackers() + groupTrackers := manager.GetGroupTrackers() groupTracker := groupTrackers["test"] assert.Equal(t, false, manager.isUserRemovable(userTracker)) assert.Equal(t, false, manager.isGroupRemovable(groupTracker)) - assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1") - assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1") err = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user) if err != nil { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err) } - assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1") - assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1") user1 := security.UserGroup{User: "test1", Groups: []string{"test1"}} usage2, err := resources.NewResourceFromConf(map[string]string{"mem": "20M", "vcore": "20"}) @@ -92,8 +92,8 @@ func TestAddRemoveUserAndGroups(t *testing.T) { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err) } - assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2") - assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2") usage3, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"}) if err != nil { @@ -108,22 +108,22 @@ func TestAddRemoveUserAndGroups(t *testing.T) { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err) } - assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2") - assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2") + assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2") err = manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user, true) if err != nil { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err) } - assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1") - assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1") + assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1") err = manager.DecreaseTrackedResource(queuePath2, TestApp2, usage2, user1, true) if err != nil { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err) } - assert.Equal(t, 0, len(manager.getUserTrackers()), "userTrackers count should be 0") - assert.Equal(t, 0, len(manager.getGroupTrackers()), "groupTrackers count should be 0") + assert.Equal(t, 0, len(manager.GetUserTrackers()), "userTrackers count should be 0") + assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0") }