Skip to content

Commit

Permalink
[YUNIKORN-1328] Handle application state changes and trigger tracker …
Browse files Browse the repository at this point in the history
…interfaces
  • Loading branch information
manirajv06 committed Oct 10, 2022
1 parent d57a390 commit 09ae825
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 37 deletions.
26 changes: 26 additions & 0 deletions pkg/scheduler/objects/application.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -1420,8 +1421,11 @@ func (sa *Application) addAllocationInternal(info *Allocation) {
if resources.IsZero(sa.allocatedPlaceholder) {
sa.initPlaceholderTimer()
}
// User resource usage needs to be updated even during resource allocation happen for ph's itself even though state change would happen only after all ph allocation completes.
sa.incUserResourceUsage(info.GetAllocatedResource())
sa.allocatedPlaceholder = resources.Add(sa.allocatedPlaceholder, info.GetAllocatedResource())
sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedPlaceholder, sa.maxAllocatedResource)

// If there are no more placeholder to allocate we should move state
if resources.Equals(sa.allocatedPlaceholder, sa.placeholderAsk) {
if err := sa.HandleApplicationEvent(RunApplication); err != nil {
Expand All @@ -1442,12 +1446,33 @@ func (sa *Application) addAllocationInternal(info *Allocation) {
zap.Error(err))
}
}
sa.incUserResourceUsage(info.GetAllocatedResource())
sa.allocatedResource = resources.Add(sa.allocatedResource, info.GetAllocatedResource())
sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource)
}
sa.allocations[info.GetUUID()] = info
}

func (sa *Application) incUserResourceUsage(resource *resources.Resource) {
if err := ugm.GetUserManager().IncreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser()); err != nil {
log.Logger().Error("Unable to track the user resource usage",
zap.String("application id", sa.ApplicationID),
zap.String("user", sa.GetUser().User),
zap.String("currentState", sa.stateMachine.Current()),
zap.Error(err))
}
}

func (sa *Application) decUserResourceUsage(resource *resources.Resource, removeApp bool) {
if err := ugm.GetUserManager().DecreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser(), removeApp); err != nil {
log.Logger().Error("Unable to track the user resource usage",
zap.String("application id", sa.ApplicationID),
zap.String("user", sa.GetUser().User),
zap.String("currentState", sa.stateMachine.Current()),
zap.Error(err))
}
}

func (sa *Application) ReplaceAllocation(uuid string) *Allocation {
sa.Lock()
defer sa.Unlock()
Expand Down Expand Up @@ -1523,6 +1548,7 @@ func (sa *Application) removeAllocationInternal(uuid string) *Allocation {
}
}
} else {
sa.decUserResourceUsage(alloc.GetAllocatedResource(), false)
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())
// When the resource trackers are zero we should not expect anything to come in later.
if sa.hasZeroAllocations() {
Expand Down
25 changes: 13 additions & 12 deletions pkg/scheduler/objects/application_state.go
Expand Up @@ -22,12 +22,11 @@ import (
"fmt"
"time"

"github.com/looplab/fsm"
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"

"github.com/looplab/fsm"
)

const noTransition = "no transition"
Expand Down Expand Up @@ -148,10 +147,21 @@ func NewAppState() *fsm.FSM {
fmt.Sprintf("enter_%s", Starting.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(app.startTimeout, app.stateMachine.Current(), RunApplication)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Resuming.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.decUserResourceUsage(app.GetAllocatedResource(), true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Completing.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication)
app.decUserResourceUsage(app.GetAllocatedResource(), true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("leave_%s", New.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand All @@ -169,16 +179,6 @@ func NewAppState() *fsm.FSM {
app.rejectedMessage = event.Args[1].(string) //nolint:errcheck
}
},
fmt.Sprintf("enter_%s", Running.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
},
fmt.Sprintf("leave_%s", Running.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Completed.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
Expand All @@ -189,6 +189,7 @@ func NewAppState() *fsm.FSM {
},
fmt.Sprintf("enter_%s", Failed.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.decUserResourceUsage(app.GetAllocatedResource(), true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/partition.go
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/placement"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand All @@ -63,6 +64,7 @@ type PartitionContext struct {
totalPartitionResource *resources.Resource // Total node resources
allocations int // Number of allocations on the partition
stateDumpFilePath string // Path of output file for state dumps
ugm *ugm.Manager // User group manager

// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
Expand Down Expand Up @@ -95,6 +97,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
nodes: objects.NewNodeCollection(conf.Name),
}
pc.partitionManager = newPartitionManager(pc, cc)
ugm.Init()
pc.ugm = ugm.GetUserManager()
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}
Expand Down
30 changes: 22 additions & 8 deletions pkg/scheduler/ugm/manager.go
Expand Up @@ -29,21 +29,35 @@ import (
"github.com/apache/yunikorn-core/pkg/log"
)

var once sync.Once
var m *Manager

type Manager struct {
userTrackers map[string]*UserTracker
groupTrackers map[string]*GroupTracker
sync.RWMutex
}

func Init() {
once.Do(func() {
m = NewManager()
})
}

func NewManager() *Manager {
manager := &Manager{
userTrackers: make(map[string]*UserTracker),
groupTrackers: make(map[string]*GroupTracker),
}
return manager
}
func (m *Manager) IncreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user *security.UserGroup) error {
if queuePath == "" || applicationID == "" || usage == nil || user == nil {

func GetUserManager() *Manager {
return m
}

func (m *Manager) IncreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user security.UserGroup) error {
if queuePath == "" || applicationID == "" || usage == nil || user.User == "" {
return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s, user: %s",
queuePath, applicationID, usage.String(), user.User)
}
Expand Down Expand Up @@ -71,8 +85,8 @@ func (m *Manager) IncreaseTrackedResource(queuePath string, applicationID string
return nil
}

func (m *Manager) DecreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user *security.UserGroup, removeApp bool) error {
if queuePath == "" || applicationID == "" || usage == nil || user == nil {
func (m *Manager) DecreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, user security.UserGroup, removeApp bool) error {
if queuePath == "" || applicationID == "" || usage == nil || user.User == "" {
return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s, user: %s",
queuePath, applicationID, usage.String(), user.User)
}
Expand Down Expand Up @@ -114,7 +128,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath string, applicationID string
return nil
}

func (m *Manager) ensureGroupTrackerForApp(applicationID string, user *security.UserGroup) {
func (m *Manager) ensureGroupTrackerForApp(applicationID string, user security.UserGroup) {
userTracker := m.userTrackers[user.User]
if !userTracker.hasGroupForApp(applicationID) {
var groupTracker *GroupTracker
Expand All @@ -131,7 +145,7 @@ func (m *Manager) ensureGroupTrackerForApp(applicationID string, user *security.

// getGroup Based on the current limitations, group name and username is same. hence, using username as group name.
// It would be changed in future based on user group resolution, limit configuration processing etc
func (m *Manager) getGroup(user *security.UserGroup) string {
func (m *Manager) getGroup(user security.UserGroup) string {
return user.User
}

Expand Down Expand Up @@ -167,10 +181,10 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker) bool {
}

// only for tests
func (m *Manager) getUserTrackers() map[string]*UserTracker {
func (m *Manager) GetUserTrackers() map[string]*UserTracker {
return m.userTrackers
}

func (m *Manager) getGroupTrackers() map[string]*GroupTracker {
func (m *Manager) GetGroupTrackers() map[string]*GroupTracker {
return m.groupTrackers
}
28 changes: 14 additions & 14 deletions pkg/scheduler/ugm/manager_test.go
Expand Up @@ -31,7 +31,7 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
// Queue setup:
// root->parent->child1
// root->parent->child2
user := &security.UserGroup{User: "test"}
user := security.UserGroup{User: "test"}
usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
Expand All @@ -42,17 +42,17 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
}

assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1")

err = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user)
if err != nil {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
}
assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1")

user1 := &security.UserGroup{User: "test1"}
user1 := security.UserGroup{User: "test1"}
usage2, err := resources.NewResourceFromConf(map[string]string{"mem": "20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2)
Expand All @@ -61,8 +61,8 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
if err != nil {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err)
}
assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2")

usage3, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"})
if err != nil {
Expand All @@ -72,20 +72,20 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
if err != nil {
t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
}
assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2")

err = manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user, true)
if err != nil {
t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
}
assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1")

err = manager.DecreaseTrackedResource(queuePath2, TestApp2, usage2, user1, true)
if err != nil {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err)
}
assert.Equal(t, 0, len(manager.getUserTrackers()), "userTrackers count should be 0")
assert.Equal(t, 0, len(manager.getGroupTrackers()), "groupTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetUserTrackers()), "userTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0")
}
2 changes: 1 addition & 1 deletion pkg/scheduler/ugm/user_tracker.go
Expand Up @@ -33,7 +33,7 @@ type UserTracker struct {
sync.RWMutex
}

func NewUserTracker(user *security.UserGroup) *UserTracker {
func NewUserTracker(user security.UserGroup) *UserTracker {
queueTracker := NewQueueTracker("root")
userTracker := &UserTracker{
userName: user.User,
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/ugm/user_tracker_test.go
Expand Up @@ -44,7 +44,7 @@ func TestIncreaseTrackedResource(t *testing.T) {
// root->parent->child1->child12
// root->parent->child2
// root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed)
user := &security.UserGroup{User: "test"}
user := security.UserGroup{User: "test"}
userTracker := NewUserTracker(user)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "10M", "vcore": "10"})
if err != nil {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestDecreaseTrackedResource(t *testing.T) {
// Queue setup:
// root->parent->child1
// root->parent->child2
user := &security.UserGroup{User: "test"}
user := security.UserGroup{User: "test"}
userTracker := NewUserTracker(user)

usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "70M", "vcore": "70"})
Expand Down

0 comments on commit 09ae825

Please sign in to comment.