From db4094be1eb8e2c3e3b502e8048d44146f2e14d9 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Fri, 7 Nov 2025 10:02:00 +0800 Subject: [PATCH 1/3] fix: index allocator, pod webhook priorityClass issue --- cmd/main.go | 5 ++-- .../scheduler/gpuresources/gpuresources.go | 29 ++++++++++--------- .../gpuresources/gpuresources_test.go | 4 +-- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ae36f8f1..d4baa199 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -247,7 +247,7 @@ func main() { pricingProvider := pricing.NewStaticPricingProvider() startWebhook(mgr, portAllocator, indexAllocator, pricingProvider) - scheduler, nodeExpander := startScheduler(ctx, allocator, mgr, k8sVersion) + scheduler, nodeExpander := startScheduler(ctx, allocator, indexAllocator, mgr, k8sVersion) startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander) @@ -500,6 +500,7 @@ func startWebhook( func startScheduler( ctx context.Context, allocator *gpuallocator.GpuAllocator, + indexAllocator *indexallocator.IndexAllocator, mgr manager.Manager, k8sVersion *k8sVer.Version, ) (*scheduler.Scheduler, *expander.NodeExpander) { @@ -513,7 +514,7 @@ func startScheduler( gpuResourceFitOpt := app.WithPlugin( gpuResourceFitPlugin.Name, - gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient()), + gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient(), indexAllocator), ) gpuTopoOpt := app.WithPlugin( gpuTopoPlugin.Name, diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go index c3759fad..5b801203 100644 --- a/internal/scheduler/gpuresources/gpuresources.go +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -13,6 +13,7 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/config" "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/quota" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -42,12 +43,13 @@ var _ framework.PostBindPlugin = &GPUFit{} var _ framework.EnqueueExtensions = &GPUFit{} type GPUFit struct { - logger *klog.Logger - fh framework.Handle - client client.Client - allocator *gpuallocator.GpuAllocator - ctx context.Context - cfg *config.GPUFitConfig + logger *klog.Logger + fh framework.Handle + client client.Client + allocator *gpuallocator.GpuAllocator + indexAllocator *indexallocator.IndexAllocator + ctx context.Context + cfg *config.GPUFitConfig } type GPUSchedulingStateData struct { @@ -80,7 +82,7 @@ func (p *GPUSchedulingStateData) Clone() fwk.StateData { type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) -func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client) PluginFactoryFunc { +func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client, indexAllocator *indexallocator.IndexAllocator) PluginFactoryFunc { return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { target := &config.GPUFitConfig{} if unknown, ok := obj.(*runtime.Unknown); ok { @@ -91,12 +93,13 @@ func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client) Plu lh := klog.FromContext(ctx).WithValues("plugin", Name) lh.Info("Creating new GPUFit plugin") c := &GPUFit{ - logger: &lh, - fh: handle, - cfg: target, - allocator: allocator, - ctx: ctx, - client: client, + logger: &lh, + fh: handle, + cfg: target, + allocator: allocator, + indexAllocator: indexAllocator, + ctx: ctx, + client: client, } lh.Info("Created new GPUFit plugin", "plugin", c) diff --git a/internal/scheduler/gpuresources/gpuresources_test.go b/internal/scheduler/gpuresources/gpuresources_test.go index 5707a640..ef91b470 100644 --- a/internal/scheduler/gpuresources/gpuresources_test.go +++ b/internal/scheduler/gpuresources/gpuresources_test.go @@ -263,7 +263,7 @@ func (s *GPUResourcesSuite) SetupTest() { s.allocator.ReconcileAllocationState() s.allocator.SetAllocatorReady() - pluginFactory := NewWithDeps(s.allocator, s.client) + pluginFactory := NewWithDeps(s.allocator, s.client, nil) pluginConfig := &runtime.Unknown{ Raw: []byte(`{ "maxWorkerPerNode": 3, @@ -597,7 +597,7 @@ func (s *GPUResourcesSuite) makePod(name string, annotations map[string]string) func (s *GPUResourcesSuite) TestNewWithDeps() { log.FromContext(s.ctx).Info("Running TestNewWithDeps") - pluginFactory := NewWithDeps(s.allocator, s.client) + pluginFactory := NewWithDeps(s.allocator, s.client, nil) s.NotNil(pluginFactory) // Test with valid config From 8b34f9ebaa8271057f3748e2fc79479d06037343 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Mon, 10 Nov 2025 17:32:23 +0800 Subject: [PATCH 2/3] fix: lint issue --- cmd/main.go | 5 ++-- .../scheduler/gpuresources/gpuresources.go | 29 +++++++++---------- .../gpuresources/gpuresources_test.go | 4 +-- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index d4baa199..ae36f8f1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -247,7 +247,7 @@ func main() { pricingProvider := pricing.NewStaticPricingProvider() startWebhook(mgr, portAllocator, indexAllocator, pricingProvider) - scheduler, nodeExpander := startScheduler(ctx, allocator, indexAllocator, mgr, k8sVersion) + scheduler, nodeExpander := startScheduler(ctx, allocator, mgr, k8sVersion) startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander) @@ -500,7 +500,6 @@ func startWebhook( func startScheduler( ctx context.Context, allocator *gpuallocator.GpuAllocator, - indexAllocator *indexallocator.IndexAllocator, mgr manager.Manager, k8sVersion *k8sVer.Version, ) (*scheduler.Scheduler, *expander.NodeExpander) { @@ -514,7 +513,7 @@ func startScheduler( gpuResourceFitOpt := app.WithPlugin( gpuResourceFitPlugin.Name, - gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient(), indexAllocator), + gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient()), ) gpuTopoOpt := app.WithPlugin( gpuTopoPlugin.Name, diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go index 5b801203..c3759fad 100644 --- a/internal/scheduler/gpuresources/gpuresources.go +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -13,7 +13,6 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/config" "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" - "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/quota" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -43,13 +42,12 @@ var _ framework.PostBindPlugin = &GPUFit{} var _ framework.EnqueueExtensions = &GPUFit{} type GPUFit struct { - logger *klog.Logger - fh framework.Handle - client client.Client - allocator *gpuallocator.GpuAllocator - indexAllocator *indexallocator.IndexAllocator - ctx context.Context - cfg *config.GPUFitConfig + logger *klog.Logger + fh framework.Handle + client client.Client + allocator *gpuallocator.GpuAllocator + ctx context.Context + cfg *config.GPUFitConfig } type GPUSchedulingStateData struct { @@ -82,7 +80,7 @@ func (p *GPUSchedulingStateData) Clone() fwk.StateData { type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) -func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client, indexAllocator *indexallocator.IndexAllocator) PluginFactoryFunc { +func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client) PluginFactoryFunc { return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { target := &config.GPUFitConfig{} if unknown, ok := obj.(*runtime.Unknown); ok { @@ -93,13 +91,12 @@ func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client, ind lh := klog.FromContext(ctx).WithValues("plugin", Name) lh.Info("Creating new GPUFit plugin") c := &GPUFit{ - logger: &lh, - fh: handle, - cfg: target, - allocator: allocator, - indexAllocator: indexAllocator, - ctx: ctx, - client: client, + logger: &lh, + fh: handle, + cfg: target, + allocator: allocator, + ctx: ctx, + client: client, } lh.Info("Created new GPUFit plugin", "plugin", c) diff --git a/internal/scheduler/gpuresources/gpuresources_test.go b/internal/scheduler/gpuresources/gpuresources_test.go index ef91b470..5707a640 100644 --- a/internal/scheduler/gpuresources/gpuresources_test.go +++ b/internal/scheduler/gpuresources/gpuresources_test.go @@ -263,7 +263,7 @@ func (s *GPUResourcesSuite) SetupTest() { s.allocator.ReconcileAllocationState() s.allocator.SetAllocatorReady() - pluginFactory := NewWithDeps(s.allocator, s.client, nil) + pluginFactory := NewWithDeps(s.allocator, s.client) pluginConfig := &runtime.Unknown{ Raw: []byte(`{ "maxWorkerPerNode": 3, @@ -597,7 +597,7 @@ func (s *GPUResourcesSuite) makePod(name string, annotations map[string]string) func (s *GPUResourcesSuite) TestNewWithDeps() { log.FromContext(s.ctx).Info("Running TestNewWithDeps") - pluginFactory := NewWithDeps(s.allocator, s.client, nil) + pluginFactory := NewWithDeps(s.allocator, s.client) s.NotNil(pluginFactory) // Test with valid config From cfac49ddeebb7322b9bc3f46fba97f3dfeef026b Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Sat, 15 Nov 2025 22:57:42 +0800 Subject: [PATCH 3/3] fix: add allocation detail in gpunode/gpu custom resource --- api/v1/gpu_types.go | 13 +++ api/v1/gpunode_funcs.go | 2 +- api/v1/gpunode_types.go | 5 +- api/v1/zz_generated.deepcopy.go | 57 ++++++++++-- .../crds/tensor-fusion.ai_gpunodes.yaml | 90 +++++++++++++++---- .../crds/tensor-fusion.ai_gpus.yaml | 71 +++++++++++++++ .../crd/bases/tensor-fusion.ai_gpunodes.yaml | 90 +++++++++++++++---- config/crd/bases/tensor-fusion.ai_gpus.yaml | 71 +++++++++++++++ internal/controller/gpupool_controller.go | 11 +-- internal/gpuallocator/gpuallocator.go | 77 ++++++++++++---- internal/gpuallocator/node_capacity.go | 16 ++-- internal/utils/config.go | 4 +- 12 files changed, 431 insertions(+), 76 deletions(-) diff --git a/api/v1/gpu_types.go b/api/v1/gpu_types.go index 02c12c31..d59b747c 100644 --- a/api/v1/gpu_types.go +++ b/api/v1/gpu_types.go @@ -79,6 +79,19 @@ type RunningAppDetail struct { // Worker count Count int `json:"count"` + + // Pod names that are running this workload + // +optional + Pods []*PodGPUInfo `json:"pods,omitempty"` +} + +type PodGPUInfo struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + UID string `json:"uid,omitempty"` + Requests Resource `json:"requests,omitempty"` + Limits Resource `json:"limits,omitempty"` + QoS QoSLevel `json:"qos,omitempty"` } // +kubebuilder:validation:Enum=Pending;Provisioning;Running;Unknown;Destroying;Migrating diff --git a/api/v1/gpunode_funcs.go b/api/v1/gpunode_funcs.go index dbb95325..adde1086 100644 --- a/api/v1/gpunode_funcs.go +++ b/api/v1/gpunode_funcs.go @@ -10,7 +10,7 @@ func (node *GPUNode) InitializeStatus(initTFlops, initVRAM resource.Quantity, in TotalTFlops: initTFlops, TotalVRAM: initVRAM, TotalGPUs: initGPUs, - AllocationInfo: []*RunningAppDetail{}, + AllocatedPods: make(map[string][]*PodGPUInfo), LoadedModels: &[]string{}, ManagedGPUDeviceIDs: []string{}, ObservedGeneration: node.Generation, diff --git a/api/v1/gpunode_types.go b/api/v1/gpunode_types.go index 33e0201d..8c708663 100644 --- a/api/v1/gpunode_types.go +++ b/api/v1/gpunode_types.go @@ -96,7 +96,10 @@ type GPUNodeStatus struct { ObservedGeneration int64 `json:"observedGeneration,omitempty"` // +optional - AllocationInfo []*RunningAppDetail `json:"allocationInfo,omitempty"` + TotalGPUPods int32 `json:"totalGPUPods,omitempty"` + + // +optional + AllocatedPods map[string][]*PodGPUInfo `json:"allocatedPods,omitempty"` } // +kubebuilder:validation:Enum=Pending;Provisioning;Migrating;Running;Succeeded;Failed;Unknown;Destroying diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 9a60fa01..110155a2 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -850,15 +850,26 @@ func (in *GPUNodeStatus) DeepCopyInto(out *GPUNodeStatus) { *out = make([]string, len(*in)) copy(*out, *in) } - if in.AllocationInfo != nil { - in, out := &in.AllocationInfo, &out.AllocationInfo - *out = make([]*RunningAppDetail, len(*in)) - for i := range *in { - if (*in)[i] != nil { - in, out := &(*in)[i], &(*out)[i] - *out = new(RunningAppDetail) - **out = **in + if in.AllocatedPods != nil { + in, out := &in.AllocatedPods, &out.AllocatedPods + *out = make(map[string][]*PodGPUInfo, len(*in)) + for key, val := range *in { + var outVal []*PodGPUInfo + if val == nil { + (*out)[key] = nil + } else { + inVal := (*in)[key] + in, out := &inVal, &outVal + *out = make([]*PodGPUInfo, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(PodGPUInfo) + (*in).DeepCopyInto(*out) + } + } } + (*out)[key] = outVal } } } @@ -1335,7 +1346,7 @@ func (in *GPUStatus) DeepCopyInto(out *GPUStatus) { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] *out = new(RunningAppDetail) - **out = **in + (*in).DeepCopyInto(*out) } } } @@ -1756,6 +1767,23 @@ func (in *PlacementConfig) DeepCopy() *PlacementConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodGPUInfo) DeepCopyInto(out *PodGPUInfo) { + *out = *in + in.Requests.DeepCopyInto(&out.Requests) + in.Limits.DeepCopyInto(&out.Limits) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodGPUInfo. +func (in *PodGPUInfo) DeepCopy() *PodGPUInfo { + if in == nil { + return nil + } + out := new(PodGPUInfo) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PoolComponentStatus) DeepCopyInto(out *PoolComponentStatus) { *out = *in @@ -1937,6 +1965,17 @@ func (in *Resources) DeepCopy() *Resources { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RunningAppDetail) DeepCopyInto(out *RunningAppDetail) { *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*PodGPUInfo, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(PodGPUInfo) + (*in).DeepCopyInto(*out) + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningAppDetail. diff --git a/charts/tensor-fusion/crds/tensor-fusion.ai_gpunodes.yaml b/charts/tensor-fusion/crds/tensor-fusion.ai_gpunodes.yaml index 796f7b9d..a50a3cd5 100644 --- a/charts/tensor-fusion/crds/tensor-fusion.ai_gpunodes.yaml +++ b/charts/tensor-fusion/crds/tensor-fusion.ai_gpunodes.yaml @@ -86,21 +86,78 @@ spec: status: description: GPUNodeStatus defines the observed state of GPUNode. properties: - allocationInfo: - items: - properties: - count: - description: Worker count - type: integer - name: - description: Workload name namespace - type: string - namespace: - type: string - required: - - count - type: object - type: array + allocatedPods: + additionalProperties: + items: + properties: + limits: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive with + TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + name: + type: string + namespace: + type: string + qos: + enum: + - low + - medium + - high + - critical + type: string + requests: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive with + TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + uid: + type: string + type: object + type: array + type: object availableTFlops: anyOf: - type: integer @@ -221,6 +278,9 @@ spec: - Unknown - Destroying type: string + totalGPUPods: + format: int32 + type: integer totalGPUs: format: int32 type: integer diff --git a/charts/tensor-fusion/crds/tensor-fusion.ai_gpus.yaml b/charts/tensor-fusion/crds/tensor-fusion.ai_gpus.yaml index 83d32bc2..50c76bce 100644 --- a/charts/tensor-fusion/crds/tensor-fusion.ai_gpus.yaml +++ b/charts/tensor-fusion/crds/tensor-fusion.ai_gpus.yaml @@ -159,6 +159,77 @@ spec: type: string namespace: type: string + pods: + description: Pod names that are running this workload + items: + properties: + limits: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive + with TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + name: + type: string + namespace: + type: string + qos: + enum: + - low + - medium + - high + - critical + type: string + requests: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive + with TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + uid: + type: string + type: object + type: array required: - count type: object diff --git a/config/crd/bases/tensor-fusion.ai_gpunodes.yaml b/config/crd/bases/tensor-fusion.ai_gpunodes.yaml index 796f7b9d..a50a3cd5 100644 --- a/config/crd/bases/tensor-fusion.ai_gpunodes.yaml +++ b/config/crd/bases/tensor-fusion.ai_gpunodes.yaml @@ -86,21 +86,78 @@ spec: status: description: GPUNodeStatus defines the observed state of GPUNode. properties: - allocationInfo: - items: - properties: - count: - description: Worker count - type: integer - name: - description: Workload name namespace - type: string - namespace: - type: string - required: - - count - type: object - type: array + allocatedPods: + additionalProperties: + items: + properties: + limits: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive with + TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + name: + type: string + namespace: + type: string + qos: + enum: + - low + - medium + - high + - critical + type: string + requests: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive with + TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + uid: + type: string + type: object + type: array + type: object availableTFlops: anyOf: - type: integer @@ -221,6 +278,9 @@ spec: - Unknown - Destroying type: string + totalGPUPods: + format: int32 + type: integer totalGPUs: format: int32 type: integer diff --git a/config/crd/bases/tensor-fusion.ai_gpus.yaml b/config/crd/bases/tensor-fusion.ai_gpus.yaml index 83d32bc2..50c76bce 100644 --- a/config/crd/bases/tensor-fusion.ai_gpus.yaml +++ b/config/crd/bases/tensor-fusion.ai_gpus.yaml @@ -159,6 +159,77 @@ spec: type: string namespace: type: string + pods: + description: Pod names that are running this workload + items: + properties: + limits: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive + with TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + name: + type: string + namespace: + type: string + qos: + enum: + - low + - medium + - high + - critical + type: string + requests: + properties: + compute: + anyOf: + - type: integer + - type: string + description: 0-100 percentage, mutually exclusive + with TFLOPs + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + tflops: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + vram: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - tflops + - vram + type: object + uid: + type: string + type: object + type: array required: - count type: object diff --git a/internal/controller/gpupool_controller.go b/internal/controller/gpupool_controller.go index 1b7200ff..a823ba9f 100644 --- a/internal/controller/gpupool_controller.go +++ b/internal/controller/gpupool_controller.go @@ -289,7 +289,6 @@ func (r *GPUPoolReconciler) reconcilePoolCurrentCapacityAndReadiness( virtualAvailableTFlops := resource.Quantity{} runningAppsCnt := int32(0) - deduplicationMap := make(map[string]struct{}) for _, node := range nodes.Items { totalGPUs = totalGPUs + node.Status.TotalGPUs @@ -304,20 +303,14 @@ func (r *GPUPoolReconciler) reconcilePoolCurrentCapacityAndReadiness( availableVRAM.Add(node.Status.AvailableVRAM) availableTFlops.Add(node.Status.AvailableTFlops) + runningAppsCnt += node.Status.TotalGPUPods + if node.Status.VirtualAvailableVRAM != nil { virtualAvailableVRAM.Add(*node.Status.VirtualAvailableVRAM) } if node.Status.VirtualAvailableTFlops != nil { virtualAvailableTFlops.Add(*node.Status.VirtualAvailableTFlops) } - - for _, runningApp := range node.Status.AllocationInfo { - workloadIdentifier := runningApp.Name + "_" + runningApp.Namespace - if _, ok := deduplicationMap[workloadIdentifier]; !ok { - runningAppsCnt++ - deduplicationMap[workloadIdentifier] = struct{}{} - } - } } pool.Status.TotalGPUs = totalGPUs diff --git a/internal/gpuallocator/gpuallocator.go b/internal/gpuallocator/gpuallocator.go index 8e599e44..a32156da 100644 --- a/internal/gpuallocator/gpuallocator.go +++ b/internal/gpuallocator/gpuallocator.go @@ -23,6 +23,7 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/utils" "github.com/samber/lo" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -320,7 +321,7 @@ func (s *GpuAllocator) Bind( } gpu.Status.Available.Vram.Sub(req.Request.Vram) - addRunningApp(s.ctx, gpu, req.WorkloadNameNamespace) + addRunningApp(s.ctx, gpu, req) s.markGPUDirty(key) } @@ -500,7 +501,7 @@ func (s *GpuAllocator) Dealloc( nodeName = storeGPU.Status.NodeSelector[constants.KubernetesHostNameLabel] } - removeRunningApp(s.ctx, storeGPU, workloadNameNamespace) + removeRunningApp(s.ctx, storeGPU, request) s.markGPUDirty(gpuNameNs) } @@ -1134,24 +1135,31 @@ func (s *GpuAllocator) SyncGPUsToK8s() { dirtyNodes[gpu.Labels[constants.LabelKeyOwner]] = struct{}{} - // Update the GPU status in Kubernetes - // using raw patch to avoid outdated revision conflicts - gpuToPatch := &tfv1.GPU{} - gpuToPatch.SetName(gpu.GetName()) - gpuToPatch.Status.Available = gpu.Status.Available - gpuToPatch.Status.RunningApps = gpu.Status.RunningApps - - if err := s.Status().Patch(s.ctx, gpuToPatch, client.MergeFrom(&tfv1.GPU{})); err != nil { - if errors.IsNotFound(err) { - // skip not existing resource to avoid infinite loop - log.V(6).Info("GPU not found, skipping update", "gpu", key.String()) - continue + // Update the GPU status in Kubernetes with retry on conflict + // Get the latest version, update status, and retry on conflict + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Get the latest version before attempting an update + latest := &tfv1.GPU{} + if err := s.Get(s.ctx, key, latest); err != nil { + if errors.IsNotFound(err) { + // skip not existing resource to avoid infinite loop + log.V(6).Info("GPU not found, skipping update", "gpu", key.String()) + return nil // return nil to stop retry + } + return err } - // If update fails, put the GPU back in the dirty queue + // Apply our status updates to the latest version + latest.Status.Available = gpu.Status.Available + latest.Status.RunningApps = gpu.Status.RunningApps + + // Attempt to update with the latest version + return s.Status().Update(s.ctx, latest) + }); err != nil { + // If update fails after retries, put the GPU back in the dirty queue s.dirtyQueueLock.Lock() s.dirtyQueue[key] = struct{}{} s.dirtyQueueLock.Unlock() - log.Error(err, "Failed to patch GPU status, will retry later", "gpu", key.String()) + log.Error(err, "Failed to update GPU status after retries, will retry later", "gpu", key.String()) } } @@ -1321,6 +1329,7 @@ func (s *GpuAllocator) reconcileAllocationState() { }) actualAvailableMap := make(map[types.NamespacedName]*tfv1.Resource) + actualRunningAppsMap := make(map[types.NamespacedName][]*tfv1.RunningAppDetail) s.storeMutex.Lock() defer s.storeMutex.Unlock() @@ -1328,7 +1337,9 @@ func (s *GpuAllocator) reconcileAllocationState() { for gpuKey, gpu := range s.gpuStore { if gpu.Status.Capacity != nil { actualAvailableMap[gpuKey] = gpu.Status.Capacity.DeepCopy() + actualRunningAppsMap[gpuKey] = gpu.Status.RunningApps gpu.Status.RunningApps = []*tfv1.RunningAppDetail{} + } // This is important for progressive migration mode @@ -1351,7 +1362,7 @@ func (s *GpuAllocator) reconcileAllocationState() { gpuAvailableRes.Tflops.Sub(allocRequest.Request.Tflops) gpuAvailableRes.Vram.Sub(allocRequest.Request.Vram) } - addRunningApp(ctx, s.gpuStore[gpuKey], tfv1.NameNamespace{Namespace: worker.Namespace, Name: worker.Labels[constants.WorkloadKey]}) + addRunningApp(ctx, s.gpuStore[gpuKey], allocRequest) } } @@ -1368,6 +1379,11 @@ func (s *GpuAllocator) reconcileAllocationState() { s.markGPUDirtyLocked(gpuKey) log.FromContext(ctx).Info("Correcting gpu available resources", "gpu", gpuKey.Name, "tflops", gpu.Status.Available.Tflops.String(), "vram", gpu.Status.Available.Vram.String()) } + + if !equality.Semantic.DeepEqual(gpu.Status.RunningApps, actualRunningAppsMap[gpuKey]) { + s.markGPUDirtyLocked(gpuKey) + log.FromContext(ctx).Info("Correcting gpu running apps", "gpu", gpuKey.Name, "runningApps", len(gpu.Status.RunningApps)) + } } // reconcile quota store state @@ -1401,7 +1417,8 @@ func (s *GpuAllocator) startWorkerCleanUpChecker() { } } -func addRunningApp(ctx context.Context, gpu *tfv1.GPU, workloadNameNamespace tfv1.NameNamespace) { +func addRunningApp(ctx context.Context, gpu *tfv1.GPU, allocRequest *tfv1.AllocRequest) { + workloadNameNamespace := allocRequest.WorkloadNameNamespace if gpu == nil { log.FromContext(ctx).Info("[Warning] GPU is nil, skip adding running app", "workload", workloadNameNamespace.Name, "namespace", workloadNameNamespace.Namespace) return @@ -1416,16 +1433,34 @@ func addRunningApp(ctx context.Context, gpu *tfv1.GPU, workloadNameNamespace tfv if found { item.Count++ + item.Pods = append(item.Pods, &tfv1.PodGPUInfo{ + Name: allocRequest.PodMeta.Name, + Namespace: allocRequest.PodMeta.Namespace, + UID: string(allocRequest.PodMeta.UID), + Requests: allocRequest.Request, + Limits: allocRequest.Limit, + QoS: allocRequest.QoS, + }) } else { gpu.Status.RunningApps = append(gpu.Status.RunningApps, &tfv1.RunningAppDetail{ Name: workloadNameNamespace.Name, Namespace: workloadNameNamespace.Namespace, Count: 1, + Pods: []*tfv1.PodGPUInfo{ + { + Name: allocRequest.PodMeta.Name, + Namespace: allocRequest.PodMeta.Namespace, + UID: string(allocRequest.PodMeta.UID), + Requests: allocRequest.Request, + Limits: allocRequest.Limit, + }, + }, }) } } -func removeRunningApp(ctx context.Context, gpu *tfv1.GPU, workloadNameNamespace tfv1.NameNamespace) { +func removeRunningApp(ctx context.Context, gpu *tfv1.GPU, allocRequest *tfv1.AllocRequest) { + workloadNameNamespace := allocRequest.WorkloadNameNamespace item, found := lo.Find(gpu.Status.RunningApps, func(app *tfv1.RunningAppDetail) bool { return app.Name == workloadNameNamespace.Name && app.Namespace == workloadNameNamespace.Namespace }) @@ -1436,6 +1471,10 @@ func removeRunningApp(ctx context.Context, gpu *tfv1.GPU, workloadNameNamespace gpu.Status.RunningApps = lo.Filter(gpu.Status.RunningApps, func(app *tfv1.RunningAppDetail, _ int) bool { return app.Name != workloadNameNamespace.Name && app.Namespace != workloadNameNamespace.Namespace }) + } else { + item.Pods = lo.Filter(item.Pods, func(pod *tfv1.PodGPUInfo, _ int) bool { + return pod.UID != string(allocRequest.PodMeta.UID) + }) } } else { // should not happen, if deallocation twice, it should be a bug diff --git a/internal/gpuallocator/node_capacity.go b/internal/gpuallocator/node_capacity.go index 56b79ace..33e76f50 100644 --- a/internal/gpuallocator/node_capacity.go +++ b/internal/gpuallocator/node_capacity.go @@ -36,10 +36,10 @@ func RefreshGPUNodeCapacity( node.Status.AvailableTFlops = resource.Quantity{} node.Status.TotalTFlops = resource.Quantity{} node.Status.TotalVRAM = resource.Quantity{} - node.Status.AllocationInfo = []*tfv1.RunningAppDetail{} + node.Status.AllocatedPods = make(map[string][]*tfv1.PodGPUInfo) + nodeGPUPodSet := make(map[string]struct{}) gpuModels := []string{} - deduplicationMap := make(map[string]struct{}) for _, gpu := range gpuList.Items { if gpu.Status.Available == nil || gpu.Status.Capacity == nil { @@ -51,10 +51,15 @@ func RefreshGPUNodeCapacity( node.Status.TotalTFlops.Add(gpu.Status.Capacity.Tflops) gpuModels = append(gpuModels, gpu.Status.GPUModel) + if _, ok := node.Status.AllocatedPods[gpu.Name]; !ok { + node.Status.AllocatedPods[gpu.Name] = []*tfv1.PodGPUInfo{} + } for _, runningApp := range gpu.Status.RunningApps { - if _, ok := deduplicationMap[runningApp.Name+"_"+runningApp.Namespace]; !ok { - node.Status.AllocationInfo = append(node.Status.AllocationInfo, runningApp.DeepCopy()) - deduplicationMap[runningApp.Name+"_"+runningApp.Namespace] = struct{}{} + for _, pod := range runningApp.Pods { + if _, ok := nodeGPUPodSet[pod.UID]; !ok { + nodeGPUPodSet[pod.UID] = struct{}{} + } + node.Status.AllocatedPods[gpu.Name] = append(node.Status.AllocatedPods[gpu.Name], pod) } } } @@ -62,6 +67,7 @@ func RefreshGPUNodeCapacity( virtualVRAM, virtualTFlops := calculateVirtualCapacity(node, pool) node.Status.VirtualTFlops = virtualTFlops node.Status.VirtualVRAM = virtualVRAM + node.Status.TotalGPUPods = int32(len(nodeGPUPodSet)) vramAvailable := virtualVRAM.DeepCopy() tflopsAvailable := virtualTFlops.DeepCopy() diff --git a/internal/utils/config.go b/internal/utils/config.go index b7442644..ba2e732b 100644 --- a/internal/utils/config.go +++ b/internal/utils/config.go @@ -137,12 +137,12 @@ func GetGPUResource(pod *corev1.Pod, isRequest bool) (tfv1.Resource, error) { if tflopsErr == nil && percentErr == nil { return tfv1.Resource{}, fmt.Errorf("tflops and compute-percent are mutually exclusive, please specify only one") } else if tflopsErr != nil && percentErr != nil { - return tfv1.Resource{}, fmt.Errorf("failed to parse tflops or compute-percent, must specify one: %w", tflopsErr) + ctrl.Log.Info("failed to parse tflops and compute-percent, no computing limit/request set", "pod", pod.Name, "namespace", pod.Namespace) } vram, vramErr := resource.ParseQuantity(pod.Annotations[vramKey]) if vramErr != nil { - return tfv1.Resource{}, vramErr + ctrl.Log.Info("failed to parse vram, annotation not found", "pod", pod.Name, "namespace", pod.Namespace, "annotation", vramKey) } return tfv1.Resource{