Skip to content

Commit

Permalink
scheduler : fix deviceshare plugin of add\remove pod (koordinator-sh#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xingbao.zy committed Mar 18, 2024
1 parent f3bc42a commit 2097d43
Show file tree
Hide file tree
Showing 6 changed files with 452 additions and 34 deletions.
15 changes: 15 additions & 0 deletions pkg/scheduler/plugins/deviceshare/device_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package deviceshare
import (
"sort"

apiext "github.com/koordinator-sh/koordinator/apis/extension"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
Expand All @@ -33,6 +35,19 @@ import (
// "1": {koordinator.sh/gpu-core:100, koordinator.sh/gpu-memory-ratio:100, koordinator.sh/gpu-memory: 16GB}
type deviceResources map[int]corev1.ResourceList

func TransDeviceAllocationsToDeviceResources(allocation apiext.DeviceAllocations) map[schedulingv1alpha1.DeviceType]deviceResources {
result := make(map[schedulingv1alpha1.DeviceType]deviceResources)

for deviceType, deviceDetails := range allocation {
result[deviceType] = make(deviceResources)
for _, deviceDetail := range deviceDetails {
result[deviceType][int(deviceDetail.Minor)] = deviceDetail.Resources.DeepCopy()
}
}

return result
}

func (r deviceResources) DeepCopy() deviceResources {
if r == nil {
return nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/plugins/deviceshare/device_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package deviceshare

import (
corev1 "k8s.io/api/core/v1"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,20 @@ import (
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

func TestTransDeviceAllocationsToDeviceResources(t *testing.T) {
allocation := make(apiext.DeviceAllocations)
allocation[schedulingv1alpha1.GPU] = append(allocation[schedulingv1alpha1.GPU], &apiext.DeviceAllocation{
Minor: 0,
Resources: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("100"),
},
})

result := TransDeviceAllocationsToDeviceResources(allocation)
res := result[schedulingv1alpha1.GPU][0][apiext.ResourceGPUCore]
assert.Equal(t, "100", res.String())
}

func Test_sortDeviceResourcesByMinor(t *testing.T) {
tests := []struct {
name string
Expand Down
52 changes: 52 additions & 0 deletions pkg/scheduler/plugins/deviceshare/nominator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package deviceshare

import (
"sync"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
corev1 "k8s.io/api/core/v1"
)

type Nominator struct {
sync.Mutex
nominateMap map[string]map[schedulingv1alpha1.DeviceType]deviceResources
}

func NewNominator() *Nominator {
return &Nominator{
nominateMap: make(map[string]map[schedulingv1alpha1.DeviceType]deviceResources),
}
}

func (nominator *Nominator) AddPod(pod *corev1.Pod, used map[schedulingv1alpha1.DeviceType]deviceResources) {
nominator.Lock()
defer nominator.Unlock()

podNamespacedName := pod.Namespace + "/" + pod.Name
nominator.nominateMap[podNamespacedName] = used
}

func (nominator *Nominator) RemovePod(pod *corev1.Pod) {
nominator.Lock()
defer nominator.Unlock()

podNamespacedName := pod.Namespace + "/" + pod.Name
delete(nominator.nominateMap, podNamespacedName)
}

func (nominator *Nominator) GetPodAllocate(pod *corev1.Pod) map[schedulingv1alpha1.DeviceType]deviceResources {
nominator.Lock()
defer nominator.Unlock()

podNamespacedName := pod.Namespace + "/" + pod.Name
return nominator.nominateMap[podNamespacedName]
}

func (nominator *Nominator) IsPodExist(pod *corev1.Pod) bool {
nominator.Lock()
defer nominator.Unlock()

podNamespacedName := pod.Namespace + "/" + pod.Name
_, exist := nominator.nominateMap[podNamespacedName]
return exist
}
35 changes: 35 additions & 0 deletions pkg/scheduler/plugins/deviceshare/nominator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package deviceshare

import (
"testing"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

func TestNominator(t *testing.T) {
nominator := NewNominator()
assert.Equal(t, 0, len(nominator.nominateMap))

pod := &corev1.Pod{}
pod.Namespace = "test"
pod.Name = "job1"

used := make(map[schedulingv1alpha1.DeviceType]deviceResources)
used[schedulingv1alpha1.GPU] = make(deviceResources)
used[schedulingv1alpha1.GPU][0] = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
}

nominator.AddPod(pod, used)
assert.Equal(t, 1, len(nominator.nominateMap))
used = nominator.GetPodAllocate(pod)
usedCPU := used[schedulingv1alpha1.GPU][0][corev1.ResourceCPU]
assert.Equal(t, usedCPU.String(), "10")
assert.Equal(t, true, nominator.IsPodExist(pod))

nominator.RemovePod(pod)
assert.Equal(t, 0, len(nominator.nominateMap))
}
130 changes: 97 additions & 33 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Plugin struct {
handle frameworkext.ExtendedHandle
nodeDeviceCache *nodeDeviceCache
scorer *resourceAllocationScorer
nominator *Nominator
}

type preFilterState struct {
Expand Down Expand Up @@ -178,12 +179,40 @@ func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, p
nd.lock.RLock()
defer nd.lock.RUnlock()

podAllocated := nd.getUsed(podInfoToAdd.Pod.Namespace, podInfoToAdd.Pod.Name)
var podAllocated map[schedulingv1alpha1.DeviceType]deviceResources
podAllocated = nd.getUsed(podInfoToAdd.Pod.Namespace, podInfoToAdd.Pod.Name)

//in generally, when we execute AddPod logic here, the pod may be in scheduling status,
//it won't exist in nodeDeviceCache's used map, so there is a bug that when the framework execute
//RunFilterPluginsWithNominatedPods with AddPod for high priority pods, the plugin can't reserve resource for
//these high priority pods, In RDMA\VF scenario, it can cause high priority pods assign fail
//due to some resources is assigned by low priority pods. So we reused the "Reserve" logic to generate an assign
//placement and save it in nominator. We will clear the nominator cache In "Reserve" and "UnReserve", which means
//we will do clean job no matter assign success or not, this is the same process of the origin k8s framework
//nominate process.
if len(podAllocated) == 0 && p.nominator != nil {
if p.nominator.IsPodExist(podInfoToAdd.Pod) {
podAllocated = p.nominator.GetPodAllocate(podInfoToAdd.Pod)
} else {
assignFunc := func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations) {
//do nothing
}
result, _ := p.reserveInternal(ctx, cycleState, state, podInfoToAdd.Pod, nodeInfo, nd, assignFunc)
podAllocated = TransDeviceAllocationsToDeviceResources(result)

p.nominator.AddPod(podInfoToAdd.Pod, podAllocated)
}
}

if len(podAllocated) == 0 {
return nil
}

rInfo := reservation.GetReservationCache().GetReservationInfoByPod(podInfoToAdd.Pod, node.Name)
var rInfo *frameworkext.ReservationInfo
if reservation.GetReservationCache() != nil {
rInfo = reservation.GetReservationCache().GetReservationInfoByPod(podInfoToAdd.Pod, node.Name)
}

if rInfo == nil {
nominator := p.handle.GetReservationNominator()
if nominator != nil {
Expand Down Expand Up @@ -233,12 +262,25 @@ func (p *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState
nd.lock.RLock()
defer nd.lock.RUnlock()

podAllocated := nd.getUsed(podInfoToRemove.Pod.Namespace, podInfoToRemove.Pod.Name)
var podAllocated map[schedulingv1alpha1.DeviceType]deviceResources
podAllocated = nd.getUsed(podInfoToRemove.Pod.Namespace, podInfoToRemove.Pod.Name)

if len(podAllocated) == 0 && p.nominator != nil {
if p.nominator.IsPodExist(podInfoToRemove.Pod) {
podAllocated = p.nominator.GetPodAllocate(podInfoToRemove.Pod)
p.nominator.RemovePod(podInfoToRemove.Pod)
}
}

if len(podAllocated) == 0 {
return nil
}

rInfo := reservation.GetReservationCache().GetReservationInfoByPod(podInfoToRemove.Pod, node.Name)
var rInfo *frameworkext.ReservationInfo
if reservation.GetReservationCache() != nil {
rInfo = reservation.GetReservationCache().GetReservationInfoByPod(podInfoToRemove.Pod, node.Name)
}

if rInfo == nil {
nominator := p.handle.GetReservationNominator()
if nominator != nil {
Expand Down Expand Up @@ -374,25 +416,8 @@ func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.Cy
return status
}

func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
}
if state.skip {
return nil
}

nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.AsStatus(err)
}

nodeDeviceInfo := p.nodeDeviceCache.getNodeDevice(nodeName, false)
if nodeDeviceInfo == nil {
return nil
}

func (p *Plugin) reserveInternal(ctx context.Context, cycleState *framework.CycleState, state *preFilterState, pod *corev1.Pod,
nodeInfo *framework.NodeInfo, nodeDeviceInfo *nodeDevice, assignFunc func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations)) (apiext.DeviceAllocations, *framework.Status) {
store := topologymanager.GetStore(cycleState)
affinity := store.GetAffinity(nodeInfo.Node().Name)

Expand All @@ -406,30 +431,68 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState,
}

reservationRestoreState := getReservationRestoreState(cycleState)
restoreState := reservationRestoreState.getNodeState(nodeName)
preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeName])

nodeDeviceInfo.lock.Lock()
defer nodeDeviceInfo.lock.Unlock()
restoreState := reservationRestoreState.getNodeState(nodeInfo.Node().Name)
preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeInfo.Node().Name])

result, status := p.allocateWithNominatedReservation(
allocator, cycleState, state, restoreState, nodeInfo.Node(), pod, preemptible)
if !status.IsSuccess() {
return status
return nil, status
}
if len(result) == 0 {
preemptible = appendAllocated(preemptible, restoreState.mergedMatchedAllocatable)
result, status = allocator.Allocate(nil, nil, nil, preemptible)
if !status.IsSuccess() {
return status
return nil, status
}
}
nodeDeviceInfo.updateCacheUsed(result, pod, true)
state.allocationResult = result
return nil

assignFunc(state, nodeDeviceInfo, pod, result)

return result, nil
}

func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
if p.nominator != nil && p.nominator.IsPodExist(pod) {
p.nominator.RemovePod(pod)
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
}
if state.skip {
return nil
}

nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.AsStatus(err)
}

nodeDeviceInfo := p.nodeDeviceCache.getNodeDevice(nodeName, false)
if nodeDeviceInfo == nil {
return nil
}

nodeDeviceInfo.lock.Lock()
defer nodeDeviceInfo.lock.Unlock()

assignFunc := func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations) {
nodeDeviceInfo.updateCacheUsed(assignResult, pod, true)
state.allocationResult = assignResult
}

_, status = p.reserveInternal(ctx, cycleState, state, pod, nodeInfo, nodeDeviceInfo, assignFunc)

return status
}

func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) {
if p.nominator != nil && p.nominator.IsPodExist(pod) {
p.nominator.RemovePod(pod)
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return
Expand Down Expand Up @@ -562,5 +625,6 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
handle: extendedHandle,
nodeDeviceCache: deviceCache,
scorer: scorePlugin(args),
nominator: NewNominator(),
}, nil
}
Loading

0 comments on commit 2097d43

Please sign in to comment.