Skip to content

Commit

Permalink
[YUNIKORN-832] Updating config can't remove partition (#319)
Browse files Browse the repository at this point in the history
All functions in partition_manager are using pass-by-value so setting the stop flag to true can't terminate other threads.
Also, the thread used to cleanup expired app can't be stopped quickly after removing partition because it needs to sleep for 24 hours.
  • Loading branch information
chia7712 committed Sep 27, 2021
1 parent 1e3399c commit cb96433
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Expand Up @@ -239,7 +239,7 @@ func (cc *ClusterContext) removePartitionsByRMID(event *rmevent.RMPartitionsRemo
// Just remove corresponding partitions
for k, partition := range cc.partitions {
if partition.RmID == event.RmID {
partition.partitionManager.stop = true
partition.partitionManager.Stop()
partitionToRemove[k] = true
}
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/scheduler/partition.go
Expand Up @@ -92,10 +92,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
reservedApps: make(map[string]int),
nodes: objects.NewNodeCollection(conf.Name),
}
pc.partitionManager = &partitionManager{
pc: pc,
cc: cc,
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}
Expand Down
87 changes: 56 additions & 31 deletions pkg/scheduler/partition_manager.go
Expand Up @@ -28,16 +28,28 @@ import (
)

const (
cleanerInterval = 10000 // sleep between queue removal checks
DefaultCleanRootInterval = 10000 * time.Millisecond // sleep between queue removal checks
DefaultCleanExpiredAppsInterval = 24 * time.Hour // sleep between apps removal checks
)

var appRemovalInterval = 24 * time.Hour

type partitionManager struct {
pc *PartitionContext
cc *ClusterContext
stop bool
interval time.Duration
pc *PartitionContext
cc *ClusterContext
stopCleanRoot chan struct{}
stopCleanExpiredApps chan struct{}
cleanRootInterval time.Duration
cleanExpiredAppsInterval time.Duration
}

func newPartitionManager(pc *PartitionContext, cc *ClusterContext) *partitionManager {
return &partitionManager{
pc: pc,
cc: cc,
stopCleanRoot: make(chan struct{}),
stopCleanExpiredApps: make(chan struct{}),
cleanRootInterval: DefaultCleanRootInterval,
cleanExpiredAppsInterval: DefaultCleanExpiredAppsInterval,
}
}

// Run the manager for the partition.
Expand All @@ -46,38 +58,46 @@ type partitionManager struct {
// - remove empty unmanaged queues
// - remove completed applications from the partition
// When the manager exits the partition is removed from the system and must be cleaned up
func (manager partitionManager) Run() {
if manager.interval == 0 {
manager.interval = cleanerInterval * time.Millisecond
}

func (manager *partitionManager) Run() {
log.Logger().Info("starting partition manager",
zap.String("partition", manager.pc.Name),
zap.String("interval", manager.interval.String()))
go manager.cleanupExpiredApps()
zap.String("cleanRootInterval", manager.cleanRootInterval.String()))
go manager.cleanExpiredApps()
go manager.cleanRoot()
}

func (manager *partitionManager) cleanRoot() {
// exit only when the partition this manager belongs to exits
for {
time.Sleep(manager.interval)
runStart := time.Now()
manager.cleanQueues(manager.pc.root)
if manager.stop {
break
cleanRootInterval := manager.cleanRootInterval
if cleanRootInterval <= 0 {
cleanRootInterval = DefaultCleanRootInterval
}
select {
case <-manager.stopCleanRoot:
return
case <-time.After(cleanRootInterval):
runStart := time.Now()
manager.cleanQueues(manager.pc.root)
log.Logger().Debug("time consumed for queue cleaner",
zap.String("duration", time.Since(runStart).String()))
}
log.Logger().Debug("time consumed for queue cleaner",
zap.String("duration", time.Since(runStart).String()))
}
manager.remove()
}

// Set the flag that the will allow the manager to exit.
// No locking needed as there is just one place where this is called which is already locked.
func (manager partitionManager) Stop() {
manager.stop = true
func (manager *partitionManager) Stop() {
go func() {
manager.stopCleanExpiredApps <- struct{}{}
manager.stopCleanRoot <- struct{}{}
manager.remove()
}()
}

// Remove drained managed and empty unmanaged queues. Perform the action recursively.
// Only called internally and recursive, no locking
func (manager partitionManager) cleanQueues(queue *objects.Queue) {
func (manager *partitionManager) cleanQueues(queue *objects.Queue) {
if queue == nil {
return
}
Expand Down Expand Up @@ -117,7 +137,7 @@ func (manager partitionManager) cleanQueues(queue *objects.Queue) {
// - nodes
// last action is to remove the cluster links
//nolint:errcheck
func (manager partitionManager) remove() {
func (manager *partitionManager) remove() {
log.Logger().Info("marking all queues for removal",
zap.String("partitionName", manager.pc.Name))
// mark all queues for removal
Expand Down Expand Up @@ -146,12 +166,17 @@ func (manager partitionManager) remove() {
manager.cc.removePartition(manager.pc.Name)
}

func (manager partitionManager) cleanupExpiredApps() {
func (manager *partitionManager) cleanExpiredApps() {
for {
if manager.stop {
break
cleanExpiredAppsInterval := manager.cleanExpiredAppsInterval
if cleanExpiredAppsInterval <= 0 {
cleanExpiredAppsInterval = DefaultCleanExpiredAppsInterval
}
select {
case <-manager.stopCleanExpiredApps:
return
case <-time.After(cleanExpiredAppsInterval):
manager.pc.cleanupExpiredApps()
}
manager.pc.cleanupExpiredApps()
time.Sleep(appRemovalInterval)
}
}
98 changes: 98 additions & 0 deletions pkg/scheduler/partition_manager_test.go
@@ -0,0 +1,98 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduler

import (
"testing"

"gotest.tools/assert"

"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
"github.com/apache/incubator-yunikorn-core/pkg/common/security"
"github.com/apache/incubator-yunikorn-core/pkg/scheduler/objects"
)

func createPartitionContext(t *testing.T) *PartitionContext {
conf := configs.PartitionConfig{
Name: "test",
Queues: []configs.QueueConfig{
{
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: nil,
},
},
}
cc := &ClusterContext{}
partition, err := newPartitionContext(conf, "test", cc)
assert.NilError(t, err)
return partition
}

func TestStopPartitionManager(t *testing.T) {
p := createPartitionContext(t)

p.partitionManager.Stop()

// this call should not be blocked forever
p.partitionManager.cleanExpiredApps()

// this call should not be blocked forever
p.partitionManager.cleanRoot()
}

func TestCleanQueues(t *testing.T) {
p := createPartitionContext(t)

root := p.GetQueue("root")
assert.Assert(t, root != nil)

// add new queue to partition
queue, err := p.createQueue("root.test", security.UserGroup{})
assert.NilError(t, err)
assert.Equal(t, false, queue.IsManaged())
assert.Equal(t, 1, len(p.root.GetCopyOfChildren()))

// make sure all queues are removed
p.partitionManager.cleanQueues(p.root)
assert.Equal(t, 0, len(p.root.GetCopyOfChildren()))
}

func TestRemoveAll(t *testing.T) {
p := createPartitionContext(t)

_, err := p.createQueue("root.test", security.UserGroup{})
assert.NilError(t, err)

// add new node to partition
err = p.addNodeToList(&objects.Node{})
assert.NilError(t, err)
assert.Equal(t, 1, p.nodes.GetNodeCount())

// add new application to partition
err = p.AddApplication(newApplication("app", p.Name, "root.test"))
assert.NilError(t, err)
assert.Equal(t, 1, len(p.applications))

// make sure all nodes and applications are removed
p.partitionManager.remove()
assert.Equal(t, 0, len(p.applications))
assert.Equal(t, 0, p.nodes.GetNodeCount())
}

0 comments on commit cb96433

Please sign in to comment.