Skip to content

Commit

Permalink
Merge branch 'master' into getapplication-resource-usage
Browse files Browse the repository at this point in the history
  • Loading branch information
richscott committed Jun 12, 2024
2 parents 8d87da5 + a786feb commit 6d1a112
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 81 deletions.
2 changes: 2 additions & 0 deletions pkg/common/configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
CMEventRingBufferCapacity = PrefixEvent + "ringBufferCapacity" // Ring Buffer Capacity
CMMaxEventStreams = PrefixEvent + "maxStreams"
CMMaxEventStreamsPerHost = PrefixEvent + "maxStreamsPerHost"
CMRESTResponseSize = PrefixEvent + "RESTResponseSize"

// defaults
DefaultHealthCheckInterval = 30 * time.Second
Expand All @@ -46,6 +47,7 @@ const (
DefaultEventRingBufferCapacity = 100000
DefaultMaxStreams = uint64(100)
DefaultMaxStreamsPerHost = uint64(15)
DefaultRESTResponseSize = uint64(10000)
)

var ConfigContext *SchedulerConfigContext
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var QueueNameRegExp = regexp.MustCompile(`^[a-zA-Z0-9_-]{1,64}$`)

// User and group name check: systems allow different things POSIX is the base but we need to be lenient and allow more.
// allow upper and lower case, add the @ and . (dot) and officially no length.
var UserRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:_.@-]*[$]?$`)
var UserRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:#/_.@-]*[$]?$`)

// Groups should have a slightly more restrictive regexp (no @ . or $ at the end)
var GroupRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:_.-]*$`)
Expand Down
1 change: 0 additions & 1 deletion pkg/common/configs/configvalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,6 @@ func TestUserName(t *testing.T) {
rejectedUserNames := []string{
"username rejected",
"",
"rejected#",
"rejected!name",
"!rejected",
" rejected ",
Expand Down
6 changes: 6 additions & 0 deletions pkg/common/security/usergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
Expand Down Expand Up @@ -153,6 +154,11 @@ func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation, force bool) (U
return ug, err
}
}

if !configs.UserRegExp.MatchString(ugi.User) {
return UserGroup{}, fmt.Errorf("invalid username, it contains invalid characters")
}

// If groups are already present we should just convert
newUG := UserGroup{User: ugi.User}
newUG.Groups = append(newUG.Groups, ugi.Groups...)
Expand Down
14 changes: 14 additions & 0 deletions pkg/common/security/usergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,18 @@ func TestConvertUGI(t *testing.T) {
if ug.Groups[0] != group {
t.Errorf("groups not initialised correctly on convert: expected '%s' got '%s'", group, ug.Groups[0])
}
// try valid username with groups
ugi.User = "validuserABCD1234@://#"
ugi.Groups = []string{group}
ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("valid username with groups, convert should not have failed: %v", err)
}
// try invalid username with groups
ugi.User = "invaliduser><+"
ugi.Groups = []string{group}
ug, err = testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("invalid username, convert should have failed: %v", err)
}
}
34 changes: 34 additions & 0 deletions pkg/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"gotest.tools/v3/assert"

"github.com/google/uuid"

"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -380,3 +382,35 @@ func TestGetConfigurationInt(t *testing.T) {
})
}
}

func TestZeroTimeInUnixNano(t *testing.T) {
// zero time
var nilValue *int64 = nil
assert.Equal(t, ZeroTimeInUnixNano(time.Time{}), nilValue)

// non-zero time
date := time.Date(2024, time.June, 6, 0, 0, 0, 0, time.UTC)
assert.Equal(t, *ZeroTimeInUnixNano(date), date.UnixNano())

// time in different timezone
date = time.Date(2024, time.June, 6, 0, 0, 0, 0, time.FixedZone("UTC+8", 8*60*60))
assert.Equal(t, *ZeroTimeInUnixNano(date), date.UnixNano())
}

func TestGetNewUUID(t *testing.T) {
newUUID := GetNewUUID()
if _, err := uuid.Parse(newUUID); err != nil {
t.Errorf("Generated UUID is not valid: %s", newUUID)
}
}

func TestIsRecoveryQueue(t *testing.T) {
// valid case
assert.Assert(t, IsRecoveryQueue("root.@recovery@"))
assert.Assert(t, IsRecoveryQueue("ROOT.@RECOVERY@"))
assert.Assert(t, IsRecoveryQueue("RoOT.@rECoVeRY@"))

// invalid case
assert.Assert(t, !IsRecoveryQueue("otherQueue"))
assert.Assert(t, !IsRecoveryQueue(""))
}
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (cc *ClusterContext) schedule() bool {
metrics.GetSchedulerMetrics().ObserveSchedulingLatency(schedulingStart)
if alloc.GetResult() == objects.Replaced {
// communicate the removal to the RM
cc.notifyRMAllocationReleased(psc.RmID, psc.Name, alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED, "replacing allocationKey: "+alloc.GetAllocationKey())
cc.notifyRMAllocationReleased(psc.RmID, psc.Name, []*objects.Allocation{alloc.GetRelease()}, si.TerminationType_PLACEHOLDER_REPLACED, "replacing allocationKey: "+alloc.GetAllocationKey())
} else {
cc.notifyRMNewAllocation(psc.RmID, alloc)
}
Expand Down
45 changes: 15 additions & 30 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Allocation struct {
released bool
reservedNodeID string
result AllocationResult
releases []*Allocation
release *Allocation
preempted bool
instType string

Expand Down Expand Up @@ -324,47 +324,32 @@ func (a *Allocation) SetResult(result AllocationResult) {
a.result = result
}

// GetReleasesClone returns a clone of the release list
func (a *Allocation) GetReleasesClone() []*Allocation {
// GetRelease returns the associated release for this allocation
func (a *Allocation) GetRelease() *Allocation {
a.RLock()
defer a.RUnlock()
result := make([]*Allocation, len(a.releases))
copy(result, a.releases)
return result
}

// GetFirstRelease returns the first release for this allocation
func (a *Allocation) GetFirstRelease() *Allocation {
a.RLock()
defer a.RUnlock()
return a.releases[0]
return a.release
}

// GetReleaseCount gets the number of releases associated with this allocation
func (a *Allocation) GetReleaseCount() int {
a.RLock()
defer a.RUnlock()
return len(a.releases)
}

// ClearReleases removes all releases from this allocation
func (a *Allocation) ClearReleases() {
// SetRelease sets the release for this allocation
func (a *Allocation) SetRelease(release *Allocation) {
a.Lock()
defer a.Unlock()
a.releases = nil
a.release = release
}

// AddRelease adds a new release to this allocation
func (a *Allocation) AddRelease(release *Allocation) {
// ClearRelease removes all releases from this allocation
func (a *Allocation) ClearRelease() {
a.Lock()
defer a.Unlock()
a.releases = append(a.releases, release)
a.release = nil
}

func (a *Allocation) SetRelease(release *Allocation) {
a.Lock()
defer a.Unlock()
a.releases = []*Allocation{release}
// HasRelease determines if this allocation has an associated release
func (a *Allocation) HasRelease() bool {
a.RLock()
defer a.RUnlock()
return a.release != nil
}

// GetAllocatedResource returns a reference to the allocated resources for this allocation. This must be treated as read-only.
Expand Down
13 changes: 3 additions & 10 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1722,30 +1722,23 @@ func (sa *Application) ReplaceAllocation(allocationKey string) *Allocation {
// remove the placeholder that was just confirmed by the shim
ph := sa.removeAllocationInternal(allocationKey, si.TerminationType_PLACEHOLDER_REPLACED)
// this has already been replaced or it is a duplicate message from the shim
if ph == nil || ph.GetReleaseCount() == 0 {
if ph == nil || !ph.HasRelease() {
log.Log(log.SchedApplication).Debug("Unexpected placeholder released",
zap.String("applicationID", sa.ApplicationID),
zap.Stringer("placeholder", ph))
return nil
}
// weird issue we should never have more than 1 log it for debugging this error
if ph.GetReleaseCount() > 1 {
log.Log(log.SchedApplication).Error("Unexpected release number, placeholder released, only 1 real allocations processed",
zap.String("applicationID", sa.ApplicationID),
zap.String("placeholderKey", allocationKey),
zap.Int("releases", ph.GetReleaseCount()))
}
// update the replacing allocation
// we double linked the real and placeholder allocation
// ph is the placeholder, the releases entry points to the real one
alloc := ph.GetFirstRelease()
alloc := ph.GetRelease()
alloc.SetPlaceholderUsed(true)
alloc.SetPlaceholderCreateTime(ph.GetCreateTime())
alloc.SetBindTime(time.Now())
sa.addAllocationInternal(alloc)
// order is important: clean up the allocation after adding it to the app
// we need the original Replaced allocation result.
alloc.ClearReleases()
alloc.ClearRelease()
alloc.SetResult(Allocated)
if sa.placeholderData != nil {
sa.placeholderData[ph.GetTaskGroup()].Replaced++
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ func TestReplaceAllocation(t *testing.T) {
// set the real one to replace the placeholder
realAlloc := newAllocation(appID1, nodeID1, res)
realAlloc.SetResult(Replaced)
ph.AddRelease(realAlloc)
ph.SetRelease(realAlloc)
alloc = app.ReplaceAllocation(ph.GetAllocationKey())
assert.Equal(t, alloc, ph, "returned allocation is not the placeholder")
assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "real allocation counted as placeholder")
Expand All @@ -1285,7 +1285,7 @@ func TestReplaceAllocation(t *testing.T) {

// add the placeholder back to the app, the failure test above changed state and removed the ph
app.SetState(Running.String())
ph.ClearReleases()
ph.ClearRelease()
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph.GetAsk())
assert.Equal(t, app.placeholderData[""].Count, int64(3))
Expand All @@ -1294,10 +1294,10 @@ func TestReplaceAllocation(t *testing.T) {
// set multiple real allocations to replace the placeholder
realAlloc = newAllocation(appID1, nodeID1, res)
realAlloc.SetResult(Replaced)
ph.AddRelease(realAlloc)
ph.SetRelease(realAlloc)
realAllocNoAdd := newAllocation(appID1, nodeID1, res)
realAllocNoAdd.SetResult(Replaced)
ph.AddRelease(realAlloc)
ph.SetRelease(realAlloc)
alloc = app.ReplaceAllocation(ph.GetAllocationKey())
assert.Equal(t, alloc, ph, "returned allocation is not the placeholder")
assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "real allocation counted as placeholder")
Expand Down Expand Up @@ -1346,21 +1346,21 @@ func TestReplaceAllocationTracking(t *testing.T) {
// replace placeholders
realAlloc1 := newAllocation(appID1, nodeID1, res)
realAlloc1.SetResult(Replaced)
ph1.AddRelease(realAlloc1)
ph1.SetRelease(realAlloc1)
alloc1 := app.ReplaceAllocation(ph1.GetAllocationKey())
app.RemoveAllocation(ph1.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED)
assert.Equal(t, ph1.GetAllocationKey(), alloc1.GetAllocationKey())
assert.Equal(t, true, app.HasPlaceholderAllocation())
realAlloc2 := newAllocation(appID1, nodeID1, res)
realAlloc2.SetResult(Replaced)
ph2.AddRelease(realAlloc2)
ph2.SetRelease(realAlloc2)
alloc2 := app.ReplaceAllocation(ph2.GetAllocationKey())
app.RemoveAllocation(ph2.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED)
assert.Equal(t, ph2.GetAllocationKey(), alloc2.GetAllocationKey())
assert.Equal(t, true, app.HasPlaceholderAllocation())
realAlloc3 := newAllocation(appID1, nodeID1, res)
realAlloc3.SetResult(Replaced)
ph3.AddRelease(realAlloc3)
ph3.SetRelease(realAlloc3)
alloc3 := app.ReplaceAllocation(ph3.GetAllocationKey())
app.RemoveAllocation(ph3.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED)
assert.Equal(t, ph3.GetAllocationKey(), alloc3.GetAllocationKey())
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,8 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object
// Retrieve the queue early before a possible race.
queue := app.GetQueue()
// check for an inflight replacement.
if alloc.GetReleaseCount() != 0 {
release := alloc.GetFirstRelease()
if alloc.HasRelease() {
release := alloc.GetRelease()
// allocation to update the ask on: this needs to happen on the real alloc never the placeholder
askAlloc := alloc
// placeholder gets handled differently from normal
Expand Down Expand Up @@ -765,8 +765,8 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object
askAlloc = release
}
// unlink the placeholder and allocation
release.ClearReleases()
alloc.ClearReleases()
release.ClearRelease()
alloc.ClearRelease()
// mark ask as unallocated to get it re-scheduled
_, err := app.DeallocateAsk(askAlloc.GetAsk().GetAllocationKey())
if err == nil {
Expand Down Expand Up @@ -876,7 +876,7 @@ func (pc *PartitionContext) tryPlaceholderAllocate() *objects.Allocation {
log.Log(log.SchedPartition).Info("scheduler replace placeholder processed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.String("placeholder released allocationKey", alloc.GetFirstRelease().GetAllocationKey()))
zap.String("placeholder released allocationKey", alloc.GetRelease().GetAllocationKey()))
// pass the release back to the RM via the cluster context
return alloc
}
Expand Down Expand Up @@ -1313,7 +1313,7 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
continue
}
if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED {
confirmed = alloc.GetFirstRelease()
confirmed = alloc.GetRelease()
// we need to check the resources equality
delta := resources.Sub(confirmed.GetAllocatedResource(), alloc.GetAllocatedResource())
// Any negative value in the delta means that at least one of the requested resource in the
Expand Down
Loading

0 comments on commit 6d1a112

Please sign in to comment.