Skip to content

Commit

Permalink
[YUNIKORN-1328] Handle application state changes and trigger tracker …
Browse files Browse the repository at this point in the history
…interfaces

[YUNIKORN-1328] Handle application state changes and trigger tracker interfaces
  • Loading branch information
manirajv06 committed Nov 21, 2022
1 parent 332d9d6 commit 95727ae
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 28 deletions.
26 changes: 26 additions & 0 deletions pkg/scheduler/objects/application.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -1420,8 +1421,11 @@ func (sa *Application) addAllocationInternal(info *Allocation) {
if resources.IsZero(sa.allocatedPlaceholder) {
sa.initPlaceholderTimer()
}
// User resource usage needs to be updated even during resource allocation happen for ph's itself even though state change would happen only after all ph allocation completes.
sa.incUserResourceUsage(info.GetAllocatedResource())
sa.allocatedPlaceholder = resources.Add(sa.allocatedPlaceholder, info.GetAllocatedResource())
sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedPlaceholder, sa.maxAllocatedResource)

// If there are no more placeholder to allocate we should move state
if resources.Equals(sa.allocatedPlaceholder, sa.placeholderAsk) {
if err := sa.HandleApplicationEvent(RunApplication); err != nil {
Expand All @@ -1442,12 +1446,33 @@ func (sa *Application) addAllocationInternal(info *Allocation) {
zap.Error(err))
}
}
sa.incUserResourceUsage(info.GetAllocatedResource())
sa.allocatedResource = resources.Add(sa.allocatedResource, info.GetAllocatedResource())
sa.maxAllocatedResource = resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource)
}
sa.allocations[info.GetUUID()] = info
}

func (sa *Application) incUserResourceUsage(resource *resources.Resource) {
if err := ugm.GetUserManager().IncreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser()); err != nil {
log.Logger().Error("Unable to track the user resource usage",
zap.String("application id", sa.ApplicationID),
zap.String("user", sa.GetUser().User),
zap.String("currentState", sa.stateMachine.Current()),
zap.Error(err))
}
}

func (sa *Application) decUserResourceUsage(resource *resources.Resource, removeApp bool) {
if err := ugm.GetUserManager().DecreaseTrackedResource(sa.GetQueuePath(), sa.ApplicationID, resource, sa.GetUser(), removeApp); err != nil {
log.Logger().Error("Unable to track the user resource usage",
zap.String("application id", sa.ApplicationID),
zap.String("user", sa.GetUser().User),
zap.String("currentState", sa.stateMachine.Current()),
zap.Error(err))
}
}

func (sa *Application) ReplaceAllocation(uuid string) *Allocation {
sa.Lock()
defer sa.Unlock()
Expand Down Expand Up @@ -1529,6 +1554,7 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term
}
}
} else {
sa.decUserResourceUsage(alloc.GetAllocatedResource(), false)
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())
// When the resource trackers are zero we should not expect anything to come in later.
if sa.hasZeroAllocations() {
Expand Down
25 changes: 13 additions & 12 deletions pkg/scheduler/objects/application_state.go
Expand Up @@ -22,12 +22,11 @@ import (
"fmt"
"time"

"github.com/looplab/fsm"
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"

"github.com/looplab/fsm"
)

const noTransition = "no transition"
Expand Down Expand Up @@ -148,10 +147,21 @@ func NewAppState() *fsm.FSM {
fmt.Sprintf("enter_%s", Starting.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(app.startTimeout, app.stateMachine.Current(), RunApplication)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Resuming.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.decUserResourceUsage(app.GetAllocatedResource(), true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Completing.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication)
app.decUserResourceUsage(app.GetAllocatedResource(), true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("leave_%s", New.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand All @@ -169,16 +179,6 @@ func NewAppState() *fsm.FSM {
app.rejectedMessage = event.Args[1].(string) //nolint:errcheck
}
},
fmt.Sprintf("enter_%s", Running.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
},
fmt.Sprintf("leave_%s", Running.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Completed.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
Expand All @@ -189,6 +189,7 @@ func NewAppState() *fsm.FSM {
},
fmt.Sprintf("enter_%s", Failed.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.decUserResourceUsage(app.GetAllocatedResource(), true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/partition.go
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/placement"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand All @@ -63,6 +64,7 @@ type PartitionContext struct {
totalPartitionResource *resources.Resource // Total node resources
allocations int // Number of allocations on the partition
stateDumpFilePath string // Path of output file for state dumps
ugm *ugm.Manager // User group manager

// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
Expand Down Expand Up @@ -95,6 +97,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
nodes: objects.NewNodeCollection(conf.Name),
}
pc.partitionManager = newPartitionManager(pc, cc)
ugm.Init()
pc.ugm = ugm.GetUserManager()
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/ugm/group_tracker_test.go
Expand Up @@ -34,6 +34,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
// root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed)
user := &security.UserGroup{User: "test", Groups: []string{"test"}}
groupTracker := newGroupTracker(user.User)

usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/ugm/manager.go
Expand Up @@ -279,10 +279,10 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker) bool {
}

// getUserTrackers only for tests
func (m *Manager) getUserTrackers() map[string]*UserTracker {
func (m *Manager) GetUserTrackers() map[string]*UserTracker {
return m.userTrackers
}

func (m *Manager) getGroupTrackers() map[string]*GroupTracker {
func (m *Manager) GetGroupTrackers() map[string]*GroupTracker {
return m.groupTrackers
}
28 changes: 14 additions & 14 deletions pkg/scheduler/ugm/manager_test.go
Expand Up @@ -65,22 +65,22 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
}

userTrackers := manager.getUserTrackers()
userTrackers := manager.GetUserTrackers()
userTracker := userTrackers["test"]
groupTrackers := manager.getGroupTrackers()
groupTrackers := manager.GetGroupTrackers()
groupTracker := groupTrackers["test"]
assert.Equal(t, false, manager.isUserRemovable(userTracker))
assert.Equal(t, false, manager.isGroupRemovable(groupTracker))

assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1")

err = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user)
if err != nil {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
}
assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1")

user1 := security.UserGroup{User: "test1", Groups: []string{"test1"}}
usage2, err := resources.NewResourceFromConf(map[string]string{"mem": "20M", "vcore": "20"})
Expand All @@ -92,8 +92,8 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err)
}

assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2")

usage3, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"})
if err != nil {
Expand All @@ -108,22 +108,22 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
}

assert.Equal(t, 2, len(manager.getUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.getGroupTrackers()), "groupTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetUserTrackers()), "userTrackers count should be 2")
assert.Equal(t, 2, len(manager.GetGroupTrackers()), "groupTrackers count should be 2")

err = manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user, true)
if err != nil {
t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
}

assert.Equal(t, 1, len(manager.getUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.getGroupTrackers()), "groupTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetUserTrackers()), "userTrackers count should be 1")
assert.Equal(t, 1, len(manager.GetGroupTrackers()), "groupTrackers count should be 1")

err = manager.DecreaseTrackedResource(queuePath2, TestApp2, usage2, user1, true)
if err != nil {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err)
}

assert.Equal(t, 0, len(manager.getUserTrackers()), "userTrackers count should be 0")
assert.Equal(t, 0, len(manager.getGroupTrackers()), "groupTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetUserTrackers()), "userTrackers count should be 0")
assert.Equal(t, 0, len(manager.GetGroupTrackers()), "groupTrackers count should be 0")
}

0 comments on commit 95727ae

Please sign in to comment.