Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions charts/tensor-fusion/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.7.3
version: 1.7.4

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.48.5"
appVersion: "1.48.6"
2 changes: 1 addition & 1 deletion charts/tensor-fusion/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ controller:
image:
repository: tensorfusion/tensor-fusion-operator
# Overrides the image tag whose default is the chart appVersion.
tag: "1.48.5"
tag: "1.48.6"
# This is for setting Kubernetes Annotations to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/

Expand Down
109 changes: 109 additions & 0 deletions internal/scheduler/gpuresources/gpuresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gpuresources
import (
"context"
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"
Expand All @@ -17,6 +18,8 @@ import (
"github.com/NexusGPU/tensor-fusion/internal/utils"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -36,6 +39,7 @@ var _ framework.FilterPlugin = &GPUFit{}
var _ framework.ScorePlugin = &GPUFit{}
var _ framework.ReservePlugin = &GPUFit{}
var _ framework.PostBindPlugin = &GPUFit{}
var _ framework.EnqueueExtensions = &GPUFit{}

type GPUFit struct {
logger *klog.Logger
Expand Down Expand Up @@ -478,3 +482,108 @@ func (s *GPUFit) PostBind(ctx context.Context, state fwk.CycleState, pod *v1.Pod
"Attach GPU device ID info", "Attach TensorFusion GPU device IDs to Pod: "+gpuIDs)
}
}

func (s *GPUFit) EventsToRegister(_ context.Context) ([]fwk.ClusterEventWithHint, error) {
// EventResource format must be: <plural-resource-name>.<version>.<group>
// Example: gpus.v1.tensor-fusion.ai
// The scheduler's eventhandlers.go will use DynInformerFactory to watch this resource
gpuResource := fwk.EventResource("gpus.v1.tensor-fusion.ai")
return []fwk.ClusterEventWithHint{
{
Event: fwk.ClusterEvent{
Resource: gpuResource,
ActionType: fwk.Add | fwk.Update,
},
QueueingHintFn: s.queueingHint,
},
}, nil
}

// convertToGPU converts an interface{} to *tfv1.GPU, handling both typed and unstructured objects
func convertToGPU(obj interface{}) (*tfv1.GPU, error) {
if obj == nil {
return nil, nil
}

// Try direct type assertion first (fastest path)
if gpu, ok := obj.(*tfv1.GPU); ok {
return gpu, nil
}

// Try to convert from unstructured.Unstructured using DefaultUnstructuredConverter
if unstructuredObj, ok := obj.(*unstructured.Unstructured); ok {
gpu := &tfv1.GPU{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, gpu); err != nil {
return nil, fmt.Errorf("failed to convert unstructured to GPU: %w", err)
}
return gpu, nil
}
return nil, fmt.Errorf("cannot convert %T to *tfv1.GPU", obj)
}

func (s *GPUFit) queueingHint(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) {
// Only process TensorFusion worker pods
if !utils.IsTensorFusionWorker(pod) {
return fwk.QueueSkip, nil
}

oldGPU, err := convertToGPU(oldObj)
if err != nil {
logger.V(5).Info("Failed to convert oldObj to GPU, skip", "error", err)
return fwk.QueueSkip, nil
}

newGPU, err := convertToGPU(newObj)
if err != nil {
logger.V(5).Info("Failed to convert newObj to GPU, skip", "error", err)
return fwk.QueueSkip, nil
}

// Calculate resource increase
var increaseTflops, increaseVram resource.Quantity
if oldGPU == nil && newGPU != nil {
// Add event: use available resources as increase
if newGPU.Status.Available != nil {
increaseTflops = newGPU.Status.Available.Tflops.DeepCopy()
increaseVram = newGPU.Status.Available.Vram.DeepCopy()
}
} else if oldGPU != nil && newGPU != nil {
// Update event: calculate difference in available resources
if oldGPU.Status.Available != nil && newGPU.Status.Available != nil {
increaseTflops = newGPU.Status.Available.Tflops.DeepCopy()
increaseVram = newGPU.Status.Available.Vram.DeepCopy()
increaseTflops.Sub(oldGPU.Status.Available.Tflops)
increaseVram.Sub(oldGPU.Status.Available.Vram)
}
}

// If resource decreased, skip
if increaseTflops.Cmp(resource.MustParse("0")) <= 0 && increaseVram.Cmp(resource.MustParse("0")) <= 0 {
return fwk.QueueSkip, nil
}

// Compose allocation request for the pod passed in by scheduler framework
allocRequest, _, err := s.allocator.ComposeAllocationRequest(pod)
if err != nil {
logger.V(5).Info("Failed to compose allocation request for pod, skip",
"pod", klog.KObj(pod), "error", err)
return fwk.QueueSkip, nil
}

// Calculate total request for this pod (multiply by count)
podTotalTflops := allocRequest.Request.Tflops
podTotalVram := allocRequest.Request.Vram

// Queue if resource increase >= pod's request
if increaseTflops.Cmp(podTotalTflops) >= 0 && increaseVram.Cmp(podTotalVram) >= 0 {
logger.V(4).Info("GPU resource increase sufficient for pod, requeue unscheduled pod",
"pod", klog.KObj(pod),
"increaseTflops", increaseTflops.String(),
"increaseVram", increaseVram.String(),
"podRequestTflops", podTotalTflops.String(),
"podRequestVram", podTotalVram.String())
return fwk.Queue, nil
}

return fwk.QueueSkip, nil
}
132 changes: 132 additions & 0 deletions internal/scheduler/gpuresources/gpuresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,135 @@ func getPreFilterResult(state *framework.CycleState) []string {
}
return lo.Keys(data.(*GPUSchedulingStateData).NodeGPUs)
}

func (s *GPUResourcesSuite) TestEventsToRegister() {
log.FromContext(s.ctx).Info("Running TestEventsToRegister")
events, err := s.plugin.EventsToRegister(s.ctx)
s.NoError(err)
s.Len(events, 1)
s.Equal(fwk.EventResource("gpus.v1.tensor-fusion.ai"), events[0].Event.Resource)
s.Equal(fwk.Add|fwk.Update, events[0].Event.ActionType)
s.NotNil(events[0].QueueingHintFn)
}

func (s *GPUResourcesSuite) TestQueueingHint_ResourceDecrease() {
log.FromContext(s.ctx).Info("Running TestQueueingHint_ResourceDecrease")
pod := s.makePod("test-pod", map[string]string{
constants.TFLOPSRequestAnnotation: "100",
constants.VRAMRequestAnnotation: "10Gi",
constants.GpuCountAnnotation: "1",
})

oldGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("500"),
Vram: resource.MustParse("10Gi"),
},
},
}
newGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("400"),
Vram: resource.MustParse("8Gi"),
},
},
}

hint, err := s.plugin.queueingHint(*s.plugin.logger, pod, oldGPU, newGPU)
s.NoError(err)
s.Equal(fwk.QueueSkip, hint)
}

func (s *GPUResourcesSuite) TestQueueingHint_ResourceIncrease_Sufficient() {
log.FromContext(s.ctx).Info("Running TestQueueingHint_ResourceIncrease_Sufficient")

// Create a pending pod
pendingPod := s.makePod("pending-pod", map[string]string{
constants.TFLOPSRequestAnnotation: "100",
constants.VRAMRequestAnnotation: "10Gi",
constants.GpuCountAnnotation: "1",
constants.GpuPoolKey: "pool-a",
})
pendingPod.Spec.NodeName = "" // Make it pending

oldGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("50"),
Vram: resource.MustParse("5Gi"),
},
},
}
newGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("150"), // Increase by 100
Vram: resource.MustParse("15Gi"), // Increase by 10Gi
},
},
}

hint, err := s.plugin.queueingHint(*s.plugin.logger, pendingPod, oldGPU, newGPU)
s.NoError(err)
s.Equal(fwk.Queue, hint)
}

func (s *GPUResourcesSuite) TestQueueingHint_NonTensorFusionPod() {
log.FromContext(s.ctx).Info("Running TestQueueingHint_NonTensorFusionPod")
nonTFPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "non-tf-pod",
Namespace: "default",
},
}

oldGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("50"),
Vram: resource.MustParse("5Gi"),
},
},
}
newGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("150"),
Vram: resource.MustParse("15Gi"),
},
},
}

hint, err := s.plugin.queueingHint(*s.plugin.logger, nonTFPod, oldGPU, newGPU)
s.NoError(err)
s.Equal(fwk.QueueSkip, hint)
}

func (s *GPUResourcesSuite) TestQueueingHint_GPUAdd() {
log.FromContext(s.ctx).Info("Running TestQueueingHint_GPUAdd")

// Create a pending pod
pendingPod := s.makePod("pending-pod-2", map[string]string{
constants.TFLOPSRequestAnnotation: "50",
constants.VRAMRequestAnnotation: "5Gi",
constants.GpuCountAnnotation: "1",
constants.GpuPoolKey: "pool-a",
})
pendingPod.Spec.NodeName = ""

// New GPU added with available resources
newGPU := &tfv1.GPU{
Status: tfv1.GPUStatus{
Available: &tfv1.Resource{
Tflops: resource.MustParse("100"), // More than pod request (50)
Vram: resource.MustParse("10Gi"), // More than pod request (5Gi)
},
},
}

hint, err := s.plugin.queueingHint(*s.plugin.logger, pendingPod, nil, newGPU)
s.NoError(err)
s.Equal(fwk.Queue, hint)
}
59 changes: 58 additions & 1 deletion internal/webhook/v1/pod_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ var _ = Describe("TensorFusionPodMutator", func() {
constants.InjectContainerAnnotation: "main",
constants.TFLOPSLimitAnnotation: "100",
constants.VRAMLimitAnnotation: "16Gi",

constants.TFLOPSRequestAnnotation: "10",
constants.VRAMRequestAnnotation: "1Gi",
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down Expand Up @@ -119,6 +122,30 @@ var _ = Describe("TensorFusionPodMutator", func() {
// Call mutator.Handle to process the admission request
resp := mutator.Handle(ctx, req)
Expect(resp.Allowed).To(BeTrue())
Expect(resp.Patches).NotTo(BeEmpty())

_, found := lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1tflops-request"
})
Expect(found).To(BeFalse())
_, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1vram-request"
})
Expect(found).To(BeFalse())
op, found := lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1isolation"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("soft"))
op, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/spec/schedulerName"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("tensor-fusion-scheduler"))
})

It("should successfully mutate a pod with TF resources", func() {
Expand All @@ -143,7 +170,6 @@ var _ = Describe("TensorFusionPodMutator", func() {
},
}
Expect(k8sClient.Create(ctx, workloadProfile)).To(Succeed())

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Expand Down Expand Up @@ -203,6 +229,37 @@ var _ = Describe("TensorFusionPodMutator", func() {
Expect(resp.Allowed).To(BeTrue())
Expect(resp.Patches).NotTo(BeEmpty())

op, found := lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1tflops-request"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("10"))
op, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1vram-request"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("1Gi"))
op, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1tflops-limit"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("100"))
op, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1vram-limit"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("16Gi"))
op, found = lo.Find(resp.Patches, func(patch jsonpatch.JsonPatchOperation) bool {
return isAddOperation(patch) &&
patch.Path == "/metadata/annotations/tensor-fusion.ai~1gpu-count"
})
Expect(found).To(BeTrue())
Expect(op.Value).To(Equal("1"))

// Check workload created
Eventually(func(g Gomega) error {
workload := &tfv1.TensorFusionWorkload{}
Expand Down
Loading