From 2c2008601b932ebb6a0908d9e7a0da8c6857ecc6 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Fri, 7 Nov 2025 10:02:00 +0800 Subject: [PATCH 1/9] fix: index allocator, pod webhook priorityClass issue --- cmd/main.go | 29 +- internal/constants/constants.go | 3 + internal/indexallocator/indexallocator.go | 272 ++++++++++++++++++ .../indexallocator/indexallocator_test.go | 130 +++++++++ .../scheduler/gpuresources/gpuresources.go | 72 +++-- .../gpuresources/gpuresources_test.go | 4 +- internal/server/router/assign_index.go | 64 +++++ internal/server/server.go | 14 + internal/webhook/v1/pod_webhook.go | 114 +++++++- internal/webhook/v1/pod_webhook_test.go | 2 +- internal/webhook/v1/webhook_suite_test.go | 2 +- 11 files changed, 669 insertions(+), 37 deletions(-) create mode 100644 internal/indexallocator/indexallocator.go create mode 100644 internal/indexallocator/indexallocator_test.go create mode 100644 internal/server/router/assign_index.go diff --git a/cmd/main.go b/cmd/main.go index 216b3ff7..a4bce332 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -36,6 +36,7 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/controller" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/portallocator" "github.com/NexusGPU/tensor-fusion/internal/scheduler/expander" @@ -232,17 +233,25 @@ func main() { // Initialize GPU allocator and set up watches allocator, portAllocator := startTensorFusionAllocators(ctx, mgr) + // Initialize Index allocator for Device Plugin communication + indexAllocator, err := indexallocator.NewIndexAllocator(ctx, mgr.GetClient()) + if err != nil { + setupLog.Error(err, "unable to set up index allocator") + os.Exit(1) + } + _ = indexAllocator.SetupWithManager(ctx, mgr) + startAutoScaler(mgr, allocator) // Create pricing provider for webhook pricingProvider := pricing.NewStaticPricingProvider() - startWebhook(mgr, portAllocator, pricingProvider) + startWebhook(mgr, portAllocator, indexAllocator, pricingProvider) - scheduler, nodeExpander := startScheduler(ctx, allocator, mgr, k8sVersion) + scheduler, nodeExpander := startScheduler(ctx, allocator, indexAllocator, mgr, k8sVersion) startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander) - startHttpServerForTFClient(ctx, kc, portAllocator, allocator, scheduler, mgr.Elected()) + startHttpServerForTFClient(ctx, kc, portAllocator, indexAllocator, allocator, scheduler, mgr.Elected()) // +kubebuilder:scaffold:builder addHealthCheckAPI(mgr) @@ -291,6 +300,7 @@ func startHttpServerForTFClient( ctx context.Context, kc *rest.Config, portAllocator *portallocator.PortAllocator, + indexAllocator *indexallocator.IndexAllocator, allocator *gpuallocator.GpuAllocator, scheduler *scheduler.Scheduler, leaderChan <-chan struct{}, @@ -310,12 +320,17 @@ func startHttpServerForTFClient( setupLog.Error(err, "failed to create assign host port router") os.Exit(1) } + assignIndexRouter, err := router.NewAssignIndexRouter(ctx, indexAllocator) + if err != nil { + setupLog.Error(err, "failed to create assign index router") + os.Exit(1) + } allocatorInfoRouter, err := router.NewAllocatorInfoRouter(ctx, allocator, scheduler) if err != nil { setupLog.Error(err, "failed to create allocator info router") os.Exit(1) } - httpServer := server.NewHTTPServer(connectionRouter, assignHostPortRouter, allocatorInfoRouter, leaderChan) + httpServer := server.NewHTTPServer(connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, leaderChan) go func() { err := httpServer.Run() if err != nil { @@ -468,12 +483,13 @@ func startCustomResourceController( func startWebhook( mgr manager.Manager, portAllocator *portallocator.PortAllocator, + indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider, ) { if os.Getenv(constants.EnableWebhookEnv) == constants.FalseStringValue { return } - if err := webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator, pricingProvider); err != nil { + if err := webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator, indexAllocator, pricingProvider); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "Pod") os.Exit(1) } @@ -482,6 +498,7 @@ func startWebhook( func startScheduler( ctx context.Context, allocator *gpuallocator.GpuAllocator, + indexAllocator *indexallocator.IndexAllocator, mgr manager.Manager, k8sVersion *k8sVer.Version, ) (*scheduler.Scheduler, *expander.NodeExpander) { @@ -495,7 +512,7 @@ func startScheduler( gpuResourceFitOpt := app.WithPlugin( gpuResourceFitPlugin.Name, - gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient()), + gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient(), indexAllocator), ) gpuTopoOpt := app.WithPlugin( gpuTopoPlugin.Name, diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 91ad841e..a2705eda 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -92,6 +92,9 @@ const ( // Additional worker pod template is set by user with /worker-pod-template annotation WorkerPodTemplateAnnotation = Domain + "/worker-pod-template" + // Pod index annotation for Device Plugin communication (1-512) + PodIndexAnnotation = Domain + "/index" + WorkloadModeAnnotation = Domain + "/workload-mode" WorkloadModeDynamic = "dynamic" WorkloadModeFixed = "fixed" diff --git a/internal/indexallocator/indexallocator.go b/internal/indexallocator/indexallocator.go new file mode 100644 index 00000000..7ad038ac --- /dev/null +++ b/internal/indexallocator/indexallocator.go @@ -0,0 +1,272 @@ +package indexallocator + +import ( + "context" + "fmt" + "math/bits" + "strconv" + "sync" + "time" + + "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/utils" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + IndexRangeStart = 1 + IndexRangeEnd = 512 + IndexBitmapSize = (IndexRangeEnd - IndexRangeStart + 63) / 64 +) + +const RELEASE_INDEX_RETRY_INTERVAL = 10 * time.Second + +var RETRY_CONFIG = wait.Backoff{ + Steps: 100, + Duration: RELEASE_INDEX_RETRY_INTERVAL, + Factor: 1.1, + Jitter: 0.1, +} + +// IndexAllocator manages allocation of 1-512 temporary indices for Pod-to-DevicePlugin communication +type IndexAllocator struct { + IsLeader bool + + Bitmap []uint64 + + Client client.Client + + storeMutex sync.RWMutex + ctx context.Context + + indexReleaseQueue chan struct { + podName string + index int + } +} + +func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocator, error) { + if client == nil { + return nil, fmt.Errorf("client cannot be nil") + } + + allocator := &IndexAllocator{ + Client: client, + IsLeader: false, + Bitmap: make([]uint64, IndexBitmapSize), + + storeMutex: sync.RWMutex{}, + ctx: ctx, + + indexReleaseQueue: make(chan struct { + podName string + index int + }), + } + + go allocator.releaseIndexUntilPodDeleted() + + return allocator, nil +} + +func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manager) <-chan struct{} { + readyCh := make(chan struct{}, 1) + _ = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + <-mgr.Elected() + s.IsLeader = true + leaderInfo := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.LeaderInfoConfigMapName, + Namespace: utils.CurrentNamespace(), + }, + } + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + _, err := controllerutil.CreateOrUpdate(ctx, s.Client, leaderInfo, func() error { + leaderInfo.Data = map[string]string{ + constants.LeaderInfoConfigMapLeaderIPKey: utils.CurrentIP(), + } + return nil + }) + return err + }) + if err != nil { + log.FromContext(ctx).Error(err, "Failed to update leader IP info in ConfigMap") + } + + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + + // Initialize bitmap from existing pods with index annotation + s.initBitmap(ctx) + + readyCh <- struct{}{} + return nil + })) + return readyCh +} + +func (s *IndexAllocator) GetLeaderIP() string { + leaderInfo := &v1.ConfigMap{} + err := s.Client.Get(context.Background(), client.ObjectKey{ + Name: constants.LeaderInfoConfigMapName, + Namespace: utils.CurrentNamespace(), + }, leaderInfo) + if err != nil { + log.FromContext(context.Background()).Error(err, "Failed to get leader IP info from ConfigMap") + return "" + } + if leaderInfo.Data == nil { + return "" + } + return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey] +} + +// AssignIndex assigns a temporary index (1-512) for Pod-to-DevicePlugin communication +// Uses distributed lock via leader election to ensure global increment +// Index is assigned in ascending order (1, 2, 3, ...) to maintain consistency +func (s *IndexAllocator) AssignIndex(podName string) (int, error) { + if !s.IsLeader { + return 0, fmt.Errorf("only leader can assign index") + } + + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + + // Find first available index in ascending order (1, 2, 3, ...) + // This ensures consistent index assignment across distributed webhook instances + // TrailingZeros64 finds the first zero bit (lowest available index) + for i, subMap := range s.Bitmap { + bitPos := bits.TrailingZeros64(^subMap) + indexOffset := i*64 + bitPos + if subMap != 0xFFFFFFFFFFFFFFFF { + assignedIndex := indexOffset + IndexRangeStart + if assignedIndex <= IndexRangeEnd { + // Mark this index as used + s.Bitmap[i] = subMap | (1 << bitPos) + return assignedIndex, nil + } else { + break + } + } + } + + // If all indices in first pass are used, wrap around and find first available + // This handles the case when 512 is reached and we need to reuse released indices + for i := 0; i < IndexBitmapSize; i++ { + for j := 0; j < 64; j++ { + indexOffset := i*64 + j + assignedIndex := indexOffset + IndexRangeStart + if assignedIndex > IndexRangeEnd { + break + } + // Check if this index is available (bit is 0) + if s.Bitmap[i]&(1< IndexRangeEnd { + return fmt.Errorf("index %d out of range [%d, %d]", index, IndexRangeStart, IndexRangeEnd) + } + + indexOffset := index - IndexRangeStart + + if immediateRelease { + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + s.Bitmap[indexOffset/64] &^= 1 << (indexOffset % 64) + return nil + } else { + // Put into queue, release until Pod not found + s.indexReleaseQueue <- struct { + podName string + index int + }{podName, index} + } + return nil +} + +func (s *IndexAllocator) releaseIndexUntilPodDeleted() { + for item := range s.indexReleaseQueue { + podName := item.podName + indexOffset := item.index - IndexRangeStart + + _ = retry.OnError(RETRY_CONFIG, func(_ error) bool { + return true + }, func() error { + // Try to get pod by name from any namespace + podList := &v1.PodList{} + err := s.Client.List(s.ctx, podList) + if err != nil { + return err + } + + found := false + for i := range podList.Items { + if podList.Items[i].Name == podName { + found = true + break + } + } + + if !found { + s.storeMutex.Lock() + defer s.storeMutex.Unlock() + s.Bitmap[indexOffset/64] &^= 1 << (indexOffset % 64) + return nil + } + return fmt.Errorf("pod still there, cannot release index %d for pod %s", item.index, podName) + }) + } +} + +func (s *IndexAllocator) initBitmap(ctx context.Context) { + log := log.FromContext(ctx) + podList := &v1.PodList{} + err := s.Client.List(ctx, podList) + if err != nil { + log.Error(err, "failed to list pods for index bitmap initialization") + return + } + + for _, pod := range podList.Items { + if pod.Annotations == nil { + continue + } + indexStr, exists := pod.Annotations[constants.PodIndexAnnotation] + if !exists || indexStr == "" { + continue + } + index, err := strconv.Atoi(indexStr) + if err != nil { + log.V(5).Info("failed to parse index annotation", "pod", pod.Name, "index", indexStr) + continue + } + if index < IndexRangeStart || index > IndexRangeEnd { + log.V(5).Info("index out of range", "pod", pod.Name, "index", index) + continue + } + + // Check if pod is still running/starting, if not, don't mark as used + if pod.DeletionTimestamp != nil || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + continue + } + + indexOffset := index - IndexRangeStart + s.Bitmap[indexOffset/64] |= 1 << (indexOffset % 64) + } +} diff --git a/internal/indexallocator/indexallocator_test.go b/internal/indexallocator/indexallocator_test.go new file mode 100644 index 00000000..9e876735 --- /dev/null +++ b/internal/indexallocator/indexallocator_test.go @@ -0,0 +1,130 @@ +package indexallocator + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestIndexAllocator_AssignIndex(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + client := fake.NewClientBuilder().WithScheme(scheme).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + allocator.IsLeader = true + + // Test assigning first index + index1, err := allocator.AssignIndex("pod-1") + assert.NoError(t, err) + assert.Equal(t, 1, index1) + + // Test assigning second index + index2, err := allocator.AssignIndex("pod-2") + assert.NoError(t, err) + assert.Equal(t, 2, index2) + + // Test assigning multiple indices - verify ascending order + for i := 3; i <= 10; i++ { + index, err := allocator.AssignIndex("pod-" + string(rune(i))) + assert.NoError(t, err) + assert.Equal(t, i, index, "index should be assigned in ascending order") + } +} + +func TestIndexAllocator_AssignIndex_IncrementalOrder(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + client := fake.NewClientBuilder().WithScheme(scheme).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + allocator.IsLeader = true + + // Test that indices are assigned in incremental order (1, 2, 3, ...) + expectedIndex := 1 + for i := 0; i < 20; i++ { + index, err := allocator.AssignIndex("pod-" + string(rune(i))) + assert.NoError(t, err) + assert.Equal(t, expectedIndex, index, "index should be assigned in ascending order") + expectedIndex++ + } +} + +func TestIndexAllocator_ReleaseIndex(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + client := fake.NewClientBuilder().WithScheme(scheme).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + allocator.IsLeader = true + + // Assign an index + index, err := allocator.AssignIndex("pod-1") + require.NoError(t, err) + + // Release it immediately + err = allocator.ReleaseIndex("pod-1", index, true) + assert.NoError(t, err) + + // Should be able to assign the same index again (wrapped around) + index2, err := allocator.AssignIndex("pod-2") + assert.NoError(t, err) + assert.Equal(t, index, index2, "released index should be available for reuse") +} + +func TestIndexAllocator_InitBitmap(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + + // Create pods with index annotations + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Annotations: map[string]string{ + "tensor-fusion.ai/index": "1", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "default", + Annotations: map[string]string{ + "tensor-fusion.ai/index": "5", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + + client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(pod1, pod2).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + + // Initialize bitmap + allocator.initBitmap(ctx) + + // Check that indices 1 and 5 are marked as used + assert.True(t, allocator.Bitmap[0]&(1<<0) != 0) // index 1 + assert.True(t, allocator.Bitmap[0]&(1<<4) != 0) // index 5 + + // Next assignment should start from 2 (first available after 1 and 5) + allocator.IsLeader = true + index, err := allocator.AssignIndex("pod-3") + assert.NoError(t, err) + assert.Equal(t, 2, index, "should assign first available index (2) after 1 and 5 are used") +} diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go index 3f3b6f12..9c129c32 100644 --- a/internal/scheduler/gpuresources/gpuresources.go +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -13,6 +13,7 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/config" "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/quota" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -42,12 +43,13 @@ var _ framework.PostBindPlugin = &GPUFit{} var _ framework.EnqueueExtensions = &GPUFit{} type GPUFit struct { - logger *klog.Logger - fh framework.Handle - client client.Client - allocator *gpuallocator.GpuAllocator - ctx context.Context - cfg *config.GPUFitConfig + logger *klog.Logger + fh framework.Handle + client client.Client + allocator *gpuallocator.GpuAllocator + indexAllocator *indexallocator.IndexAllocator + ctx context.Context + cfg *config.GPUFitConfig } type GPUSchedulingStateData struct { @@ -80,7 +82,7 @@ func (p *GPUSchedulingStateData) Clone() fwk.StateData { type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) -func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client) PluginFactoryFunc { +func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client, indexAllocator *indexallocator.IndexAllocator) PluginFactoryFunc { return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { target := &config.GPUFitConfig{} if unknown, ok := obj.(*runtime.Unknown); ok { @@ -91,12 +93,13 @@ func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client) Plu lh := klog.FromContext(ctx).WithValues("plugin", Name) lh.Info("Creating new GPUFit plugin") c := &GPUFit{ - logger: &lh, - fh: handle, - cfg: target, - allocator: allocator, - ctx: ctx, - client: client, + logger: &lh, + fh: handle, + cfg: target, + allocator: allocator, + indexAllocator: indexAllocator, + ctx: ctx, + client: client, } lh.Info("Created new GPUFit plugin", "plugin", c) @@ -431,6 +434,15 @@ func (s *GPUFit) Reserve(ctx context.Context, state fwk.CycleState, pod *v1.Pod, if err != nil { return fwk.NewStatus(fwk.Error, err.Error()) } + + // Index is already assigned in webhook stage, scheduler cannot modify Pod + // Just verify that index annotation exists for logging + if pod.Annotations != nil { + if indexStr, exists := pod.Annotations[constants.PodIndexAnnotation]; exists && indexStr != "" { + s.logger.V(5).Info("Pod index already assigned in webhook", "pod", pod.Name, "index", indexStr) + } + } + return fwk.NewStatus(fwk.Success, "") } @@ -467,12 +479,36 @@ func (s *GPUFit) PostBind(ctx context.Context, state fwk.CycleState, pod *v1.Pod // write the allocated GPU info to Pod in bindingCycle, before default binder changing the Pod nodeName info gpuIDs := strings.Join(gpuSchedulingResult.(*GPUSchedulingStateData).FinalGPUs, ",") s.logger.Info("PostBinding pod for GPU resources", "pod", pod.Name, "node", nodeName, "gpuIDs", gpuIDs) - patch := []byte(`[{ - "op": "add", - "path": "/metadata/annotations/` + utils.EscapeJSONPointer(constants.GPUDeviceIDsAnnotation) + `", - "value": "` + gpuIDs + `"}]`) - err = s.client.Patch(s.ctx, pod, client.RawPatch(types.JSONPatchType, patch)) + // Prepare patches for both GPU IDs and index cleanup + patches := []string{ + `{ + "op": "add", + "path": "/metadata/annotations/` + utils.EscapeJSONPointer(constants.GPUDeviceIDsAnnotation) + `", + "value": "` + gpuIDs + `" + }`, + } + + // Remove index annotation after Device Plugin allocation is complete + // This releases the temporary index for reuse + if pod.Annotations != nil { + if indexStr, exists := pod.Annotations[constants.PodIndexAnnotation]; exists && indexStr != "" { + index, parseErr := strconv.Atoi(indexStr) + if parseErr == nil && s.indexAllocator != nil { + // Release index asynchronously (will be cleaned up when Pod is deleted) + _ = s.indexAllocator.ReleaseIndex(pod.Name, index, false) + + // Remove annotation immediately to prevent matching in next cycle + patches = append(patches, `{ + "op": "remove", + "path": "/metadata/annotations/`+utils.EscapeJSONPointer(constants.PodIndexAnnotation)+`" + }`) + } + } + } + + patchJSON := "[" + strings.Join(patches, ",") + "]" + err = s.client.Patch(s.ctx, pod, client.RawPatch(types.JSONPatchType, []byte(patchJSON))) if err != nil { s.logger.Error(err, "failed to patch gpu device ids", "pod", pod.Name) s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeWarning, "GPUDeviceAllocatedFailed", diff --git a/internal/scheduler/gpuresources/gpuresources_test.go b/internal/scheduler/gpuresources/gpuresources_test.go index 5707a640..ef91b470 100644 --- a/internal/scheduler/gpuresources/gpuresources_test.go +++ b/internal/scheduler/gpuresources/gpuresources_test.go @@ -263,7 +263,7 @@ func (s *GPUResourcesSuite) SetupTest() { s.allocator.ReconcileAllocationState() s.allocator.SetAllocatorReady() - pluginFactory := NewWithDeps(s.allocator, s.client) + pluginFactory := NewWithDeps(s.allocator, s.client, nil) pluginConfig := &runtime.Unknown{ Raw: []byte(`{ "maxWorkerPerNode": 3, @@ -597,7 +597,7 @@ func (s *GPUResourcesSuite) makePod(name string, annotations map[string]string) func (s *GPUResourcesSuite) TestNewWithDeps() { log.FromContext(s.ctx).Info("Running TestNewWithDeps") - pluginFactory := NewWithDeps(s.allocator, s.client) + pluginFactory := NewWithDeps(s.allocator, s.client, nil) s.NotNil(pluginFactory) // Test with valid config diff --git a/internal/server/router/assign_index.go b/internal/server/router/assign_index.go new file mode 100644 index 00000000..a2b60d40 --- /dev/null +++ b/internal/server/router/assign_index.go @@ -0,0 +1,64 @@ +package router + +import ( + "context" + "fmt" + "net/http" + + "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" + "github.com/NexusGPU/tensor-fusion/internal/utils" + "github.com/gin-gonic/gin" + v1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const assignIndexTokenReviewName = "assign-index-token-review" + +type AssignIndexRouter struct { + allocator *indexallocator.IndexAllocator +} + +func NewAssignIndexRouter(ctx context.Context, allocator *indexallocator.IndexAllocator) (*AssignIndexRouter, error) { + return &AssignIndexRouter{allocator: allocator}, nil +} + +func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) { + podName := ctx.Query("podName") + token := ctx.Request.Header.Get(constants.AuthorizationHeader) + + if token == "" { + log.FromContext(ctx).Error(nil, "assigned index failed, missing token", "podName", podName) + ctx.String(http.StatusUnauthorized, "missing authorization header") + return + } + tokenReview := &v1.TokenReview{ + ObjectMeta: metav1.ObjectMeta{ + Name: assignIndexTokenReviewName, + }, + Spec: v1.TokenReviewSpec{ + Token: token, + }, + } + if err := r.allocator.Client.Create(ctx, tokenReview); err != nil { + log.FromContext(ctx).Error(err, "assigned index failed, auth endpoint error", "podName", podName) + ctx.String(http.StatusInternalServerError, "auth endpoint error") + return + } + if !tokenReview.Status.Authenticated || tokenReview.Status.User.Username != utils.GetSelfServiceAccountNameFull() { + log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName) + ctx.String(http.StatusUnauthorized, "token authentication failed") + return + } + + index, err := r.allocator.AssignIndex(podName) + if err != nil { + log.FromContext(ctx).Error(err, "assigned index failed", "podName", podName) + ctx.String(http.StatusInternalServerError, err.Error()) + return + } + log.FromContext(ctx).Info("assigned index successfully", "podName", podName, "index", index) + ctx.String(http.StatusOK, fmt.Sprintf("%d", index)) +} + diff --git a/internal/server/server.go b/internal/server/server.go index f002e446..432fb61d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -13,6 +13,7 @@ import ( func NewHTTPServer( cr *router.ConnectionRouter, ahp *router.AssignHostPortRouter, + ai *router.AssignIndexRouter, alc *router.AllocatorInfoRouter, leaderChan <-chan struct{}, ) *gin.Engine { @@ -39,6 +40,19 @@ func NewHTTPServer( // suspend API call utils it becomes leader ahp.AssignHostPort(ctx) }) + apiGroup.GET("/assign-index", func(ctx *gin.Context) { + if leaderChan == nil { + ctx.String(http.StatusServiceUnavailable, "current instance is not the leader") + return + } + select { + case <-ctx.Done(): + return + case <-leaderChan: + } + // suspend API call utils it becomes leader + ai.AssignIndex(ctx) + }) apiGroup.GET("/provision", func(ctx *gin.Context) { controller.ProvisioningToggle = ctx.Query("enable") == "true" }) diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 71243776..9d016e63 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/strategicpatch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,6 +39,7 @@ import ( tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" "github.com/NexusGPU/tensor-fusion/internal/cloudprovider/pricing" "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/portallocator" "github.com/NexusGPU/tensor-fusion/internal/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,24 +48,26 @@ import ( var httpClient = &http.Client{Timeout: 10 * time.Second} // SetupPodWebhookWithManager registers the webhook for Pod in the manager. -func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, pricingProvider pricing.PricingProvider) error { +func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider) error { webhookServer := mgr.GetWebhookServer() webhookServer.Register("/mutate-v1-pod", &admission.Webhook{ Handler: &TensorFusionPodMutator{ - decoder: admission.NewDecoder(mgr.GetScheme()), - Client: mgr.GetClient(), - portAllocator: portAllocator, + decoder: admission.NewDecoder(mgr.GetScheme()), + Client: mgr.GetClient(), + portAllocator: portAllocator, + indexAllocator: indexAllocator, }, }) return nil } type TensorFusionPodMutator struct { - Client client.Client - decoder admission.Decoder - portAllocator *portallocator.PortAllocator + Client client.Client + decoder admission.Decoder + portAllocator *portallocator.PortAllocator + indexAllocator *indexallocator.IndexAllocator } // Handle implements admission.Handler interface. @@ -175,12 +179,13 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque // Add priorityClass if contains higher QoS level and Pod priority class not specified if pod.Spec.PriorityClassName == "" && (tfInfo.Profile.Qos == tfv1.QoSHigh || tfInfo.Profile.Qos == tfv1.QoSCritical) { - pod.Spec.PriorityClassName = constants.TensorFusionSystemName + string(tfInfo.Profile.Qos) + pod.Spec.PriorityClassName = fmt.Sprintf("%s-%s", + constants.TensorFusionSystemName, string(tfInfo.Profile.Qos)) } // Inject initContainer and env variables patches, err := m.patchTFClient( - pod, pool, tfInfo.Profile.IsLocalGPU, currentBytes, containerIndices, tfInfo.Profile.SidecarWorker, + ctx, pod, pool, tfInfo.Profile.IsLocalGPU, currentBytes, containerIndices, tfInfo.Profile.SidecarWorker, ) if err != nil { log.Error(err, "failed to patch tf client", "pod", req.Name, "namespace", req.Namespace) @@ -265,6 +270,7 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload( } func (m *TensorFusionPodMutator) patchTFClient( + ctx context.Context, pod *corev1.Pod, pool *tfv1.GPUPool, isLocalGPU bool, @@ -281,6 +287,47 @@ func (m *TensorFusionPodMutator) patchTFClient( assignPodLabelsAndAnnotations(isLocalGPU, pod, pool) + // Assign index once per pod (before processing containers) + // Index must be assigned in webhook stage since scheduler cannot modify Pod + // This is a special index resource (1-512), not a real device resource + // Index is assigned in ascending order (1, 2, 3, ...) via distributed lock (leader election) + var index int + var indexErr error + podIdentifier := pod.Name + if podIdentifier == "" { + // For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID + podIdentifier = pod.GenerateName + string(pod.UID) + } + + if m.indexAllocator != nil && m.indexAllocator.IsLeader { + index, indexErr = m.indexAllocator.AssignIndex(podIdentifier) + if indexErr != nil { + log := log.FromContext(ctx) + log.Error(indexErr, "failed to assign index for pod", "pod", podIdentifier) + index = 0 + } + } else if m.indexAllocator != nil && !m.indexAllocator.IsLeader { + // If not leader, get index from leader via HTTP API (similar to port allocation) + // This ensures global increment across distributed webhook instances + index, indexErr = m.assignIndexFromLeader(ctx, pod) + if indexErr != nil { + log := log.FromContext(ctx) + log.Error(indexErr, "failed to assign index from leader", "pod", podIdentifier) + index = 0 + } + } else { + // No allocator available, use 0 as fallback + index = 0 + } + + // Set annotation for matching in Device Plugin + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if index > 0 { + pod.Annotations[constants.PodIndexAnnotation] = strconv.Itoa(index) + } + for _, containerIndex := range containerIndices { container := &pod.Spec.Containers[containerIndex] containerJSON, err := json.Marshal(container) @@ -307,6 +354,19 @@ func (m *TensorFusionPodMutator) patchTFClient( removeNativeGPULimits(container) + // Inject tensor-fusion.ai/index resource for Device Plugin communication + // This is a special index resource (not a real device), used for Pod-to-DevicePlugin communication + if container.Resources.Requests == nil { + container.Resources.Requests = make(corev1.ResourceList) + } + if container.Resources.Limits == nil { + container.Resources.Limits = make(corev1.ResourceList) + } + // Request is 0 to prevent scheduler resource sum calculation issues + container.Resources.Requests[constants.PodIndexAnnotation] = resource.MustParse("0") + // Limit is set to actual index value (1-512) for Device Plugin to match Pod + container.Resources.Limits[constants.PodIndexAnnotation] = resource.MustParse(strconv.Itoa(index)) + if !isLocalGPU { addConnectionForRemoteFixedReplicaVirtualGPU(pod, container, clientConfig) } else if isSidecarWorker { @@ -525,6 +585,42 @@ func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod return strconv.Atoi(string(body)) } +func (m *TensorFusionPodMutator) assignIndexFromLeader(ctx context.Context, pod *corev1.Pod) (int, error) { + leaderIP := m.indexAllocator.GetLeaderIP() + if leaderIP == "" { + return 0, fmt.Errorf("operator leader IP not found") + } + + podIdentifier := pod.Name + if podIdentifier == "" { + podIdentifier = pod.GenerateName + string(pod.UID) + } + urlStr := fmt.Sprintf("http://%s:8080/assign-index?podName=%s", leaderIP, podIdentifier) + req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil) + if err != nil { + return 0, err + } + req.Header.Set(constants.AuthorizationHeader, "Bearer "+utils.ReadServiceAccountToken()) + resp, err := httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to assign index: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("index allocation failed: %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read allocation response: %w", err) + } + + return strconv.Atoi(string(body)) +} + func calculateQoSLevel(profile *tfv1.WorkloadProfileSpec, pool *tfv1.GPUPool) tfv1.QoSLevel { // when not set, assign default QoS if profile.Qos == "" { diff --git a/internal/webhook/v1/pod_webhook_test.go b/internal/webhook/v1/pod_webhook_test.go index ca1dc4fc..d59d2ebf 100644 --- a/internal/webhook/v1/pod_webhook_test.go +++ b/internal/webhook/v1/pod_webhook_test.go @@ -896,7 +896,7 @@ var _ = Describe("TensorFusionPodMutator", func() { currentBytes, err := json.Marshal(pod) Expect(err).NotTo(HaveOccurred()) - patch, err := mutator.patchTFClient(pod, pool, false, currentBytes, []int{0}, false) + patch, err := mutator.patchTFClient(context.Background(), pod, pool, false, currentBytes, []int{0}, false) Expect(err).NotTo(HaveOccurred()) Expect(patch).NotTo(BeEmpty()) // There should be at least 2 patches (initContainers and the container env patches) diff --git a/internal/webhook/v1/webhook_suite_test.go b/internal/webhook/v1/webhook_suite_test.go index 26a6685d..5e96bd22 100644 --- a/internal/webhook/v1/webhook_suite_test.go +++ b/internal/webhook/v1/webhook_suite_test.go @@ -141,7 +141,7 @@ var _ = BeforeSuite(func() { PortRangeStartCluster: 42000, PortRangeEndCluster: 62000, BitmapCluster: make([]uint64, (62000-42000)/64+1), - }, mockPricingProvider) + }, nil, mockPricingProvider) Expect(err).NotTo(HaveOccurred()) // +kubebuilder:scaffold:webhook From f9f3d988836fa0cf251ba7a886faa564a58d3432 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Mon, 10 Nov 2025 17:29:47 +0800 Subject: [PATCH 2/9] fix: simplify index allocation --- internal/constants/vendors.go | 20 +- internal/indexallocator/indexallocator.go | 207 ++---------------- .../indexallocator/indexallocator_test.go | 70 +----- internal/portallocator/portallocator.go | 14 +- .../scheduler/gpuresources/gpuresources.go | 35 +-- internal/utils/config.go | 22 +- internal/webhook/v1/pod_webhook.go | 5 +- 7 files changed, 65 insertions(+), 308 deletions(-) diff --git a/internal/constants/vendors.go b/internal/constants/vendors.go index 42871c71..f72c4636 100644 --- a/internal/constants/vendors.go +++ b/internal/constants/vendors.go @@ -8,22 +8,22 @@ const ( // DSA vendors - Global AcceleratorVendorQualcomm = "Qualcomm" - AcceleratorVendorAWSNeuron = "AWS-Neuron" - AcceleratorVendorGoogleTPU = "Google-TPU" + AcceleratorVendorAWSNeuron = "AWSNeuron" + AcceleratorVendorGoogleTPU = "Google" AcceleratorVendorCerebras = "Cerebras" // GPGPU vendors - CN - AcceleratorVendorHygon = "Hygon-DCU" - AcceleratorVendorMetaX = "Meta-X" + AcceleratorVendorHygon = "Hygon" + AcceleratorVendorMetaX = "MetaX" AcceleratorVendorMThreads = "MThreads" - AcceleratorVendorBiren = "BirenGPU" - AcceleratorVendorAlibabaTHead = "THead-PPU" + AcceleratorVendorBiren = "Biren" + AcceleratorVendorAlibabaTHead = "THead" // DSA vendors - CN - AcceleratorVendorHuaweiAscendNPU = "Ascend-NPU" - AcceleratorVendorCambricon = "Cambricon-MLU" - AcceleratorVendorEnflame = "Enflame-XPU" - AcceleratorVendorKunlunX = "KunlunXin-XPU" + AcceleratorVendorHuaweiAscendNPU = "Ascend" + AcceleratorVendorCambricon = "Cambricon" + AcceleratorVendorEnflame = "Enflame" + AcceleratorVendorKunlunX = "KunlunXin" AcceleratorVendorUnknown = "Unknown" ) diff --git a/internal/indexallocator/indexallocator.go b/internal/indexallocator/indexallocator.go index 7ad038ac..31bef633 100644 --- a/internal/indexallocator/indexallocator.go +++ b/internal/indexallocator/indexallocator.go @@ -3,10 +3,7 @@ package indexallocator import ( "context" "fmt" - "math/bits" - "strconv" - "sync" - "time" + "sync/atomic" "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -14,7 +11,6 @@ import ( "k8s.io/client-go/util/retry" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -24,33 +20,20 @@ import ( const ( IndexRangeStart = 1 IndexRangeEnd = 512 - IndexBitmapSize = (IndexRangeEnd - IndexRangeStart + 63) / 64 ) -const RELEASE_INDEX_RETRY_INTERVAL = 10 * time.Second - -var RETRY_CONFIG = wait.Backoff{ - Steps: 100, - Duration: RELEASE_INDEX_RETRY_INTERVAL, - Factor: 1.1, - Jitter: 0.1, -} - // IndexAllocator manages allocation of 1-512 temporary indices for Pod-to-DevicePlugin communication +// Uses a simple atomic counter that increments from 1 to 512, then wraps around to 1 +// No bitmap tracking needed - index reuse is acceptable after 512 cycles type IndexAllocator struct { IsLeader bool - Bitmap []uint64 + // Atomic counter for index allocation (1-512, wraps around) + currentIndex int64 Client client.Client - storeMutex sync.RWMutex - ctx context.Context - - indexReleaseQueue chan struct { - podName string - index int - } + ctx context.Context } func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocator, error) { @@ -59,21 +42,12 @@ func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocat } allocator := &IndexAllocator{ - Client: client, - IsLeader: false, - Bitmap: make([]uint64, IndexBitmapSize), - - storeMutex: sync.RWMutex{}, - ctx: ctx, - - indexReleaseQueue: make(chan struct { - podName string - index int - }), + Client: client, + IsLeader: false, + currentIndex: 0, // Will start from 1 on first assignment + ctx: ctx, } - go allocator.releaseIndexUntilPodDeleted() - return allocator, nil } @@ -101,172 +75,23 @@ func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manag log.FromContext(ctx).Error(err, "Failed to update leader IP info in ConfigMap") } - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - - // Initialize bitmap from existing pods with index annotation - s.initBitmap(ctx) - readyCh <- struct{}{} return nil })) return readyCh } -func (s *IndexAllocator) GetLeaderIP() string { - leaderInfo := &v1.ConfigMap{} - err := s.Client.Get(context.Background(), client.ObjectKey{ - Name: constants.LeaderInfoConfigMapName, - Namespace: utils.CurrentNamespace(), - }, leaderInfo) - if err != nil { - log.FromContext(context.Background()).Error(err, "Failed to get leader IP info from ConfigMap") - return "" - } - if leaderInfo.Data == nil { - return "" - } - return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey] -} - // AssignIndex assigns a temporary index (1-512) for Pod-to-DevicePlugin communication -// Uses distributed lock via leader election to ensure global increment -// Index is assigned in ascending order (1, 2, 3, ...) to maintain consistency +// Uses atomic increment to ensure thread-safe assignment +// Index wraps around from 512 to 1 (simple modulo operation) func (s *IndexAllocator) AssignIndex(podName string) (int, error) { if !s.IsLeader { return 0, fmt.Errorf("only leader can assign index") } - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - - // Find first available index in ascending order (1, 2, 3, ...) - // This ensures consistent index assignment across distributed webhook instances - // TrailingZeros64 finds the first zero bit (lowest available index) - for i, subMap := range s.Bitmap { - bitPos := bits.TrailingZeros64(^subMap) - indexOffset := i*64 + bitPos - if subMap != 0xFFFFFFFFFFFFFFFF { - assignedIndex := indexOffset + IndexRangeStart - if assignedIndex <= IndexRangeEnd { - // Mark this index as used - s.Bitmap[i] = subMap | (1 << bitPos) - return assignedIndex, nil - } else { - break - } - } - } - - // If all indices in first pass are used, wrap around and find first available - // This handles the case when 512 is reached and we need to reuse released indices - for i := 0; i < IndexBitmapSize; i++ { - for j := 0; j < 64; j++ { - indexOffset := i*64 + j - assignedIndex := indexOffset + IndexRangeStart - if assignedIndex > IndexRangeEnd { - break - } - // Check if this index is available (bit is 0) - if s.Bitmap[i]&(1< IndexRangeEnd { - return fmt.Errorf("index %d out of range [%d, %d]", index, IndexRangeStart, IndexRangeEnd) - } - - indexOffset := index - IndexRangeStart + // Atomic increment and wrap around + next := atomic.AddInt64(&s.currentIndex, 1) + index := int((next-1)%IndexRangeEnd) + IndexRangeStart - if immediateRelease { - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - s.Bitmap[indexOffset/64] &^= 1 << (indexOffset % 64) - return nil - } else { - // Put into queue, release until Pod not found - s.indexReleaseQueue <- struct { - podName string - index int - }{podName, index} - } - return nil -} - -func (s *IndexAllocator) releaseIndexUntilPodDeleted() { - for item := range s.indexReleaseQueue { - podName := item.podName - indexOffset := item.index - IndexRangeStart - - _ = retry.OnError(RETRY_CONFIG, func(_ error) bool { - return true - }, func() error { - // Try to get pod by name from any namespace - podList := &v1.PodList{} - err := s.Client.List(s.ctx, podList) - if err != nil { - return err - } - - found := false - for i := range podList.Items { - if podList.Items[i].Name == podName { - found = true - break - } - } - - if !found { - s.storeMutex.Lock() - defer s.storeMutex.Unlock() - s.Bitmap[indexOffset/64] &^= 1 << (indexOffset % 64) - return nil - } - return fmt.Errorf("pod still there, cannot release index %d for pod %s", item.index, podName) - }) - } -} - -func (s *IndexAllocator) initBitmap(ctx context.Context) { - log := log.FromContext(ctx) - podList := &v1.PodList{} - err := s.Client.List(ctx, podList) - if err != nil { - log.Error(err, "failed to list pods for index bitmap initialization") - return - } - - for _, pod := range podList.Items { - if pod.Annotations == nil { - continue - } - indexStr, exists := pod.Annotations[constants.PodIndexAnnotation] - if !exists || indexStr == "" { - continue - } - index, err := strconv.Atoi(indexStr) - if err != nil { - log.V(5).Info("failed to parse index annotation", "pod", pod.Name, "index", indexStr) - continue - } - if index < IndexRangeStart || index > IndexRangeEnd { - log.V(5).Info("index out of range", "pod", pod.Name, "index", index) - continue - } - - // Check if pod is still running/starting, if not, don't mark as used - if pod.DeletionTimestamp != nil || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { - continue - } - - indexOffset := index - IndexRangeStart - s.Bitmap[indexOffset/64] |= 1 << (indexOffset % 64) - } + return index, nil } diff --git a/internal/indexallocator/indexallocator_test.go b/internal/indexallocator/indexallocator_test.go index 9e876735..c9d7069e 100644 --- a/internal/indexallocator/indexallocator_test.go +++ b/internal/indexallocator/indexallocator_test.go @@ -6,8 +6,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -57,7 +55,7 @@ func TestIndexAllocator_AssignIndex_IncrementalOrder(t *testing.T) { } } -func TestIndexAllocator_ReleaseIndex(t *testing.T) { +func TestIndexAllocator_WrapAround(t *testing.T) { scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() client := fake.NewClientBuilder().WithScheme(scheme).Build() @@ -66,65 +64,15 @@ func TestIndexAllocator_ReleaseIndex(t *testing.T) { require.NoError(t, err) allocator.IsLeader = true - // Assign an index - index, err := allocator.AssignIndex("pod-1") - require.NoError(t, err) - - // Release it immediately - err = allocator.ReleaseIndex("pod-1", index, true) - assert.NoError(t, err) - - // Should be able to assign the same index again (wrapped around) - index2, err := allocator.AssignIndex("pod-2") - assert.NoError(t, err) - assert.Equal(t, index, index2, "released index should be available for reuse") -} - -func TestIndexAllocator_InitBitmap(t *testing.T) { - scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() - - // Create pods with index annotations - pod1 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - Namespace: "default", - Annotations: map[string]string{ - "tensor-fusion.ai/index": "1", - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - } - pod2 := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-2", - Namespace: "default", - Annotations: map[string]string{ - "tensor-fusion.ai/index": "5", - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, + // Assign indices until we reach 512 + for i := 1; i <= 512; i++ { + index, err := allocator.AssignIndex("pod-" + string(rune(i))) + assert.NoError(t, err) + assert.Equal(t, i, index) } - client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(pod1, pod2).Build() - - ctx := context.Background() - allocator, err := NewIndexAllocator(ctx, client) - require.NoError(t, err) - - // Initialize bitmap - allocator.initBitmap(ctx) - - // Check that indices 1 and 5 are marked as used - assert.True(t, allocator.Bitmap[0]&(1<<0) != 0) // index 1 - assert.True(t, allocator.Bitmap[0]&(1<<4) != 0) // index 5 - - // Next assignment should start from 2 (first available after 1 and 5) - allocator.IsLeader = true - index, err := allocator.AssignIndex("pod-3") + // Next assignment should wrap around to 1 + index, err := allocator.AssignIndex("pod-513") assert.NoError(t, err) - assert.Equal(t, 2, index, "should assign first available index (2) after 1 and 5 are used") + assert.Equal(t, 1, index, "index should wrap around from 512 to 1") } diff --git a/internal/portallocator/portallocator.go b/internal/portallocator/portallocator.go index 1fba53df..1a050eee 100644 --- a/internal/portallocator/portallocator.go +++ b/internal/portallocator/portallocator.go @@ -152,19 +152,7 @@ func (s *PortAllocator) SetupWithManager(ctx context.Context, mgr manager.Manage } func (s *PortAllocator) GetLeaderIP() string { - leaderInfo := &v1.ConfigMap{} - err := s.Client.Get(context.Background(), client.ObjectKey{ - Name: constants.LeaderInfoConfigMapName, - Namespace: utils.CurrentNamespace(), - }, leaderInfo) - if err != nil { - log.FromContext(context.Background()).Error(err, "Failed to get leader IP info from ConfigMap") - return "" - } - if leaderInfo.Data == nil { - return "" - } - return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey] + return utils.GetLeaderIP(s.Client) } // AssignHostPort always called by operator itself, thus no Leader-Follower inconsistency issue diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go index 9c129c32..5b801203 100644 --- a/internal/scheduler/gpuresources/gpuresources.go +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -480,35 +480,12 @@ func (s *GPUFit) PostBind(ctx context.Context, state fwk.CycleState, pod *v1.Pod gpuIDs := strings.Join(gpuSchedulingResult.(*GPUSchedulingStateData).FinalGPUs, ",") s.logger.Info("PostBinding pod for GPU resources", "pod", pod.Name, "node", nodeName, "gpuIDs", gpuIDs) - // Prepare patches for both GPU IDs and index cleanup - patches := []string{ - `{ - "op": "add", - "path": "/metadata/annotations/` + utils.EscapeJSONPointer(constants.GPUDeviceIDsAnnotation) + `", - "value": "` + gpuIDs + `" - }`, - } - - // Remove index annotation after Device Plugin allocation is complete - // This releases the temporary index for reuse - if pod.Annotations != nil { - if indexStr, exists := pod.Annotations[constants.PodIndexAnnotation]; exists && indexStr != "" { - index, parseErr := strconv.Atoi(indexStr) - if parseErr == nil && s.indexAllocator != nil { - // Release index asynchronously (will be cleaned up when Pod is deleted) - _ = s.indexAllocator.ReleaseIndex(pod.Name, index, false) - - // Remove annotation immediately to prevent matching in next cycle - patches = append(patches, `{ - "op": "remove", - "path": "/metadata/annotations/`+utils.EscapeJSONPointer(constants.PodIndexAnnotation)+`" - }`) - } - } - } - - patchJSON := "[" + strings.Join(patches, ",") + "]" - err = s.client.Patch(s.ctx, pod, client.RawPatch(types.JSONPatchType, []byte(patchJSON))) + // Patch GPU device IDs annotation + patch := []byte(`[{ + "op": "add", + "path": "/metadata/annotations/` + utils.EscapeJSONPointer(constants.GPUDeviceIDsAnnotation) + `", + "value": "` + gpuIDs + `"}]`) + err = s.client.Patch(s.ctx, pod, client.RawPatch(types.JSONPatchType, patch)) if err != nil { s.logger.Error(err, "failed to patch gpu device ids", "pod", pod.Name) s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeWarning, "GPUDeviceAllocatedFailed", diff --git a/internal/utils/config.go b/internal/utils/config.go index ee4a228c..b7442644 100644 --- a/internal/utils/config.go +++ b/internal/utils/config.go @@ -1,7 +1,7 @@ package utils import ( - context "context" + "context" "encoding/base64" "encoding/json" "fmt" @@ -14,7 +14,9 @@ import ( "github.com/lithammer/shortuuid/v4" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" ) @@ -197,3 +199,21 @@ func IsProgressiveMigration() bool { func SetProgressiveMigration(isProgressiveMigration bool) { nvidiaOperatorProgressiveMigrationEnv = isProgressiveMigration } + +// GetLeaderIP retrieves the leader IP from the ConfigMap +// This is used by both PortAllocator and IndexAllocator for distributed allocation +func GetLeaderIP(client client.Client) string { + leaderInfo := &corev1.ConfigMap{} + err := client.Get(context.Background(), types.NamespacedName{ + Name: constants.LeaderInfoConfigMapName, + Namespace: CurrentNamespace(), + }, leaderInfo) + if err != nil { + ctrl.Log.V(5).Info("Failed to get leader IP info from ConfigMap", "error", err) + return "" + } + if leaderInfo.Data == nil { + return "" + } + return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey] +} diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 9d016e63..8c4f6365 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -553,8 +553,7 @@ func (m *TensorFusionPodMutator) getPortNumber(pod *corev1.Pod) int { } func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod) (int, error) { - - leaderIP := m.portAllocator.GetLeaderIP() + leaderIP := utils.GetLeaderIP(m.Client) if leaderIP == "" { return 0, fmt.Errorf("operator leader IP not found") } @@ -586,7 +585,7 @@ func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod } func (m *TensorFusionPodMutator) assignIndexFromLeader(ctx context.Context, pod *corev1.Pod) (int, error) { - leaderIP := m.indexAllocator.GetLeaderIP() + leaderIP := utils.GetLeaderIP(m.Client) if leaderIP == "" { return 0, fmt.Errorf("operator leader IP not found") } From d7db4ae865de08ecd18992cd7464eb2a51661542 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Mon, 10 Nov 2025 17:32:23 +0800 Subject: [PATCH 3/9] fix: lint issue --- cmd/main.go | 9 +++--- .../scheduler/gpuresources/gpuresources.go | 29 +++++++++---------- .../gpuresources/gpuresources_test.go | 4 +-- internal/server/router/assign_index.go | 1 - 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index a4bce332..ae36f8f1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -247,7 +247,7 @@ func main() { pricingProvider := pricing.NewStaticPricingProvider() startWebhook(mgr, portAllocator, indexAllocator, pricingProvider) - scheduler, nodeExpander := startScheduler(ctx, allocator, indexAllocator, mgr, k8sVersion) + scheduler, nodeExpander := startScheduler(ctx, allocator, mgr, k8sVersion) startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander) @@ -330,7 +330,9 @@ func startHttpServerForTFClient( setupLog.Error(err, "failed to create allocator info router") os.Exit(1) } - httpServer := server.NewHTTPServer(connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, leaderChan) + httpServer := server.NewHTTPServer( + connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, leaderChan, + ) go func() { err := httpServer.Run() if err != nil { @@ -498,7 +500,6 @@ func startWebhook( func startScheduler( ctx context.Context, allocator *gpuallocator.GpuAllocator, - indexAllocator *indexallocator.IndexAllocator, mgr manager.Manager, k8sVersion *k8sVer.Version, ) (*scheduler.Scheduler, *expander.NodeExpander) { @@ -512,7 +513,7 @@ func startScheduler( gpuResourceFitOpt := app.WithPlugin( gpuResourceFitPlugin.Name, - gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient(), indexAllocator), + gpuResourceFitPlugin.NewWithDeps(allocator, mgr.GetClient()), ) gpuTopoOpt := app.WithPlugin( gpuTopoPlugin.Name, diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go index 5b801203..c3759fad 100644 --- a/internal/scheduler/gpuresources/gpuresources.go +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -13,7 +13,6 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/config" "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" - "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/quota" "github.com/NexusGPU/tensor-fusion/internal/utils" @@ -43,13 +42,12 @@ var _ framework.PostBindPlugin = &GPUFit{} var _ framework.EnqueueExtensions = &GPUFit{} type GPUFit struct { - logger *klog.Logger - fh framework.Handle - client client.Client - allocator *gpuallocator.GpuAllocator - indexAllocator *indexallocator.IndexAllocator - ctx context.Context - cfg *config.GPUFitConfig + logger *klog.Logger + fh framework.Handle + client client.Client + allocator *gpuallocator.GpuAllocator + ctx context.Context + cfg *config.GPUFitConfig } type GPUSchedulingStateData struct { @@ -82,7 +80,7 @@ func (p *GPUSchedulingStateData) Clone() fwk.StateData { type PluginFactoryFunc func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) -func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client, indexAllocator *indexallocator.IndexAllocator) PluginFactoryFunc { +func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client) PluginFactoryFunc { return func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { target := &config.GPUFitConfig{} if unknown, ok := obj.(*runtime.Unknown); ok { @@ -93,13 +91,12 @@ func NewWithDeps(allocator *gpuallocator.GpuAllocator, client client.Client, ind lh := klog.FromContext(ctx).WithValues("plugin", Name) lh.Info("Creating new GPUFit plugin") c := &GPUFit{ - logger: &lh, - fh: handle, - cfg: target, - allocator: allocator, - indexAllocator: indexAllocator, - ctx: ctx, - client: client, + logger: &lh, + fh: handle, + cfg: target, + allocator: allocator, + ctx: ctx, + client: client, } lh.Info("Created new GPUFit plugin", "plugin", c) diff --git a/internal/scheduler/gpuresources/gpuresources_test.go b/internal/scheduler/gpuresources/gpuresources_test.go index ef91b470..5707a640 100644 --- a/internal/scheduler/gpuresources/gpuresources_test.go +++ b/internal/scheduler/gpuresources/gpuresources_test.go @@ -263,7 +263,7 @@ func (s *GPUResourcesSuite) SetupTest() { s.allocator.ReconcileAllocationState() s.allocator.SetAllocatorReady() - pluginFactory := NewWithDeps(s.allocator, s.client, nil) + pluginFactory := NewWithDeps(s.allocator, s.client) pluginConfig := &runtime.Unknown{ Raw: []byte(`{ "maxWorkerPerNode": 3, @@ -597,7 +597,7 @@ func (s *GPUResourcesSuite) makePod(name string, annotations map[string]string) func (s *GPUResourcesSuite) TestNewWithDeps() { log.FromContext(s.ctx).Info("Running TestNewWithDeps") - pluginFactory := NewWithDeps(s.allocator, s.client, nil) + pluginFactory := NewWithDeps(s.allocator, s.client) s.NotNil(pluginFactory) // Test with valid config diff --git a/internal/server/router/assign_index.go b/internal/server/router/assign_index.go index a2b60d40..b3f0c6a0 100644 --- a/internal/server/router/assign_index.go +++ b/internal/server/router/assign_index.go @@ -61,4 +61,3 @@ func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) { log.FromContext(ctx).Info("assigned index successfully", "podName", podName, "index", index) ctx.String(http.StatusOK, fmt.Sprintf("%d", index)) } - From 521df0da690d188fc6186aa3c6ca9378b8d6226c Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Mon, 10 Nov 2025 17:38:14 +0800 Subject: [PATCH 4/9] fix: optimize func length --- internal/webhook/v1/pod_webhook.go | 78 ++++++++++++++++-------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 8c4f6365..61e60b1b 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -291,42 +291,8 @@ func (m *TensorFusionPodMutator) patchTFClient( // Index must be assigned in webhook stage since scheduler cannot modify Pod // This is a special index resource (1-512), not a real device resource // Index is assigned in ascending order (1, 2, 3, ...) via distributed lock (leader election) - var index int - var indexErr error - podIdentifier := pod.Name - if podIdentifier == "" { - // For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID - podIdentifier = pod.GenerateName + string(pod.UID) - } - - if m.indexAllocator != nil && m.indexAllocator.IsLeader { - index, indexErr = m.indexAllocator.AssignIndex(podIdentifier) - if indexErr != nil { - log := log.FromContext(ctx) - log.Error(indexErr, "failed to assign index for pod", "pod", podIdentifier) - index = 0 - } - } else if m.indexAllocator != nil && !m.indexAllocator.IsLeader { - // If not leader, get index from leader via HTTP API (similar to port allocation) - // This ensures global increment across distributed webhook instances - index, indexErr = m.assignIndexFromLeader(ctx, pod) - if indexErr != nil { - log := log.FromContext(ctx) - log.Error(indexErr, "failed to assign index from leader", "pod", podIdentifier) - index = 0 - } - } else { - // No allocator available, use 0 as fallback - index = 0 - } - - // Set annotation for matching in Device Plugin - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } - if index > 0 { - pod.Annotations[constants.PodIndexAnnotation] = strconv.Itoa(index) - } + index := m.assignDeviceAllocationIndex(ctx, pod) + log.FromContext(ctx).Info("assigned device allocation index successfully", "index", index, "pod", pod.Name) for _, containerIndex := range containerIndices { container := &pod.Spec.Containers[containerIndex] @@ -411,6 +377,46 @@ func (m *TensorFusionPodMutator) patchTFClient( return patches, nil } +func (m *TensorFusionPodMutator) assignDeviceAllocationIndex(ctx context.Context, pod *corev1.Pod) int { + var index int + var indexErr error + podIdentifier := pod.Name + if podIdentifier == "" { + // For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID + podIdentifier = pod.GenerateName + string(pod.UID) + } + + if m.indexAllocator != nil && m.indexAllocator.IsLeader { + index, indexErr = m.indexAllocator.AssignIndex(podIdentifier) + if indexErr != nil { + log := log.FromContext(ctx) + log.Error(indexErr, "failed to assign index for pod", "pod", podIdentifier) + index = 0 + } + } else if m.indexAllocator != nil && !m.indexAllocator.IsLeader { + // If not leader, get index from leader via HTTP API (similar to port allocation) + // This ensures global increment across distributed webhook instances + index, indexErr = m.assignIndexFromLeader(ctx, pod) + if indexErr != nil { + log := log.FromContext(ctx) + log.Error(indexErr, "failed to assign index from leader", "pod", podIdentifier) + index = 0 + } + } else { + // No allocator available, use 0 as fallback + index = 0 + } + + // Set annotation for matching in Device Plugin + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if index > 0 { + pod.Annotations[constants.PodIndexAnnotation] = strconv.Itoa(index) + } + return index +} + // Convert the strategic merge patch to JSON func calculatePodPatch(currentBytes []byte, pod *corev1.Pod, clientConfig *tfv1.ClientConfig, isLocalGPU bool) ([]jsonpatch.JsonPatchOperation, error) { var patchBytes []byte From a23f4a72674c79940bde74e6ef96df38acf4ae8b Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Tue, 11 Nov 2025 13:33:47 +0800 Subject: [PATCH 5/9] fix: add toleration for tensor-fusion managed nodes --- internal/constants/constants.go | 1 + internal/webhook/v1/pod_webhook.go | 8 ++++++++ internal/worker/worker.go | 8 ++++++++ 3 files changed, 17 insertions(+) diff --git a/internal/constants/constants.go b/internal/constants/constants.go index a2705eda..fd84b9b3 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -122,6 +122,7 @@ const ( TensorFusionPodCounterKeyAnnotation = Domain + "/pod-counter-key" TensorFusionPodCountAnnotation = Domain + "/tf-pod-count" TensorFusionWorkerSuffix = "-tf" + NodeUsedByAnnotation = Domain + "/used-by" // For grey release TensorFusionEnabledReplicasAnnotation = Domain + "/enabled-replicas" diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 61e60b1b..9bde043f 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -458,6 +458,14 @@ func assignPodLabelsAndAnnotations(isLocalGPU bool, pod *corev1.Pod, pool *tfv1. pod.Labels[constants.LabelComponent] = constants.ComponentWorker pod.Annotations[constants.EmbeddedWorkerAnnotation] = constants.TrueStringValue // no need to add port in local gpu mode, communication is done through shared memory in the same process + + // Add toleration for TensorFusion nodes + pod.Spec.Tolerations = append(pod.Spec.Tolerations, corev1.Toleration{ + Key: constants.NodeUsedByAnnotation, + Operator: corev1.TolerationOpEqual, + Value: constants.TensorFusionSystemName, + Effect: corev1.TaintEffectNoSchedule, + }) } else { pod.Labels[constants.LabelComponent] = constants.ComponentClient } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index d803c6e2..b9337d4a 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -60,6 +60,14 @@ func (wg *WorkerGenerator) GenerateWorkerPod( spec.EnableServiceLinks = ptr.To(false) spec.SchedulerName = constants.SchedulerName + // tolerate the nodes that used by TensorFusion system + spec.Tolerations = append(spec.Tolerations, v1.Toleration{ + Key: constants.NodeUsedByAnnotation, + Operator: v1.TolerationOpEqual, + Value: constants.TensorFusionSystemName, + Effect: v1.TaintEffectNoSchedule, + }) + // Add labels to identify this pod as part of the workload labels, annotations := utils.AppendTFWorkerLabelsAndAnnotationsAfterTemplate(podTmpl, workload, containerName) From 64b7b73e61f180a107bf42176d80f5eb8b307435 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Tue, 11 Nov 2025 13:50:59 +0800 Subject: [PATCH 6/9] fix: add/remove node taint to isolate scheduler in progressive migration mode --- internal/controller/gpunode_controller.go | 2 +- internal/gpuallocator/gpuallocator_test.go | 2 +- internal/gpuallocator/node_capacity.go | 29 ++++++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/internal/controller/gpunode_controller.go b/internal/controller/gpunode_controller.go index c1f34937..5fb0d3dd 100644 --- a/internal/controller/gpunode_controller.go +++ b/internal/controller/gpunode_controller.go @@ -188,7 +188,7 @@ func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity( return nil } else { - gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator) + gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator, coreNode) if err != nil { return err } diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index bb3a494d..496818d3 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -97,7 +97,7 @@ var _ = Describe("GPU Allocator", func() { if err := k8sClient.Get(ctx, types.NamespacedName{Name: "test-pool"}, pool); err != nil { Expect(err).NotTo(HaveOccurred()) } - _, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator) + _, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator, nil) // Verify resources were reduced on the allocated GPU gpu := getGPU(gpus[0].Name) diff --git a/internal/gpuallocator/node_capacity.go b/internal/gpuallocator/node_capacity.go index cdeb9141..aa011bf1 100644 --- a/internal/gpuallocator/node_capacity.go +++ b/internal/gpuallocator/node_capacity.go @@ -6,15 +6,20 @@ import ( tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/utils" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/util/taints" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" ) func RefreshGPUNodeCapacity( ctx context.Context, k8sClient client.Client, node *tfv1.GPUNode, pool *tfv1.GPUPool, allocator *GpuAllocator, + coreNode *corev1.Node, ) ([]string, error) { gpuList := &tfv1.GPUList{} if err := k8sClient.List(ctx, gpuList, client.MatchingLabels{constants.LabelKeyOwner: node.Name}); err != nil { @@ -76,6 +81,30 @@ func RefreshGPUNodeCapacity( if err != nil { return nil, fmt.Errorf("failed to update GPU node status: %w", err) } + + // check if need to update K8S node label + if utils.IsProgressiveMigration() && coreNode != nil { + taint := &corev1.Taint{ + Key: constants.NodeUsedByAnnotation, + Effect: corev1.TaintEffectNoSchedule, + } + needUpdateNode := false + if node.Status.AvailableVRAM.Equal(node.Status.TotalVRAM) && node.Status.AvailableTFlops.Equal(node.Status.TotalTFlops) { + // check if need to remove the taint + coreNode, needUpdateNode, _ = taints.RemoveTaint(coreNode, taint) + } else if !taints.TaintExists(coreNode.Spec.Taints, taint) { + // check if need to add the taint + coreNode, needUpdateNode, _ = taints.AddOrUpdateTaint(coreNode, taint) + } + if needUpdateNode { + log.FromContext(ctx).Info("Updating K8S node taints for isolation of tensor-fusion and non-tensor-fusion used nodes", + "node", coreNode.Name, "taint", taint.Key) + err := k8sClient.Update(ctx, coreNode) + if err != nil { + return nil, fmt.Errorf("failed to update K8S node: %w", err) + } + } + } } return gpuModels, nil } From ce0665ebbb4c60ebe8ebfdd96e4fe2715b451125 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Tue, 11 Nov 2025 13:55:01 +0800 Subject: [PATCH 7/9] fix: add ignore resource group for scheduler --- charts/tensor-fusion/Chart.yaml | 2 +- charts/tensor-fusion/values.yaml | 2 ++ config/samples/scheduler-config.yaml | 2 ++ internal/webhook/v1/pod_webhook.go | 2 -- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/charts/tensor-fusion/Chart.yaml b/charts/tensor-fusion/Chart.yaml index adf0639f..cf5b8c13 100644 --- a/charts/tensor-fusion/Chart.yaml +++ b/charts/tensor-fusion/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.7.4 +version: 1.7.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/tensor-fusion/values.yaml b/charts/tensor-fusion/values.yaml index 3f7e9ebf..fbf1a9d0 100644 --- a/charts/tensor-fusion/values.yaml +++ b/charts/tensor-fusion/values.yaml @@ -208,6 +208,8 @@ schedulerConfig: totalIntranetBandWidthGBps: 100 - name: NodeResourcesFit args: + ignoredResourceGroups: + - "tensor-fusion.ai" scoringStrategy: resources: - name: cpu diff --git a/config/samples/scheduler-config.yaml b/config/samples/scheduler-config.yaml index 74860dbf..81f5550a 100644 --- a/config/samples/scheduler-config.yaml +++ b/config/samples/scheduler-config.yaml @@ -40,6 +40,8 @@ profiles: totalIntranetBandWidthGBps: 100 - name: NodeResourcesFit args: + ignoredResourceGroups: + - "tensor-fusion.ai" scoringStrategy: resources: - name: cpu diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 9bde043f..66d7e2dc 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -328,8 +328,6 @@ func (m *TensorFusionPodMutator) patchTFClient( if container.Resources.Limits == nil { container.Resources.Limits = make(corev1.ResourceList) } - // Request is 0 to prevent scheduler resource sum calculation issues - container.Resources.Requests[constants.PodIndexAnnotation] = resource.MustParse("0") // Limit is set to actual index value (1-512) for Device Plugin to match Pod container.Resources.Limits[constants.PodIndexAnnotation] = resource.MustParse(strconv.Itoa(index)) From 440541063df1625ee2824f337485c6898843e390 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Tue, 11 Nov 2025 13:58:01 +0800 Subject: [PATCH 8/9] fix: rename taint key var name --- internal/constants/constants.go | 2 +- internal/gpuallocator/node_capacity.go | 3 ++- internal/webhook/v1/pod_webhook.go | 2 +- internal/worker/worker.go | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/constants/constants.go b/internal/constants/constants.go index fd84b9b3..557fdabd 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -122,7 +122,7 @@ const ( TensorFusionPodCounterKeyAnnotation = Domain + "/pod-counter-key" TensorFusionPodCountAnnotation = Domain + "/tf-pod-count" TensorFusionWorkerSuffix = "-tf" - NodeUsedByAnnotation = Domain + "/used-by" + NodeUsedByTaintKey = Domain + "/used-by" // For grey release TensorFusionEnabledReplicasAnnotation = Domain + "/enabled-replicas" diff --git a/internal/gpuallocator/node_capacity.go b/internal/gpuallocator/node_capacity.go index aa011bf1..56b79ace 100644 --- a/internal/gpuallocator/node_capacity.go +++ b/internal/gpuallocator/node_capacity.go @@ -85,8 +85,9 @@ func RefreshGPUNodeCapacity( // check if need to update K8S node label if utils.IsProgressiveMigration() && coreNode != nil { taint := &corev1.Taint{ - Key: constants.NodeUsedByAnnotation, + Key: constants.NodeUsedByTaintKey, Effect: corev1.TaintEffectNoSchedule, + Value: constants.TensorFusionSystemName, } needUpdateNode := false if node.Status.AvailableVRAM.Equal(node.Status.TotalVRAM) && node.Status.AvailableTFlops.Equal(node.Status.TotalTFlops) { diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 66d7e2dc..55315584 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -459,7 +459,7 @@ func assignPodLabelsAndAnnotations(isLocalGPU bool, pod *corev1.Pod, pool *tfv1. // Add toleration for TensorFusion nodes pod.Spec.Tolerations = append(pod.Spec.Tolerations, corev1.Toleration{ - Key: constants.NodeUsedByAnnotation, + Key: constants.NodeUsedByTaintKey, Operator: corev1.TolerationOpEqual, Value: constants.TensorFusionSystemName, Effect: corev1.TaintEffectNoSchedule, diff --git a/internal/worker/worker.go b/internal/worker/worker.go index b9337d4a..18874a36 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -62,7 +62,7 @@ func (wg *WorkerGenerator) GenerateWorkerPod( // tolerate the nodes that used by TensorFusion system spec.Tolerations = append(spec.Tolerations, v1.Toleration{ - Key: constants.NodeUsedByAnnotation, + Key: constants.NodeUsedByTaintKey, Operator: v1.TolerationOpEqual, Value: constants.TensorFusionSystemName, Effect: v1.TaintEffectNoSchedule, From 4256077cac9e068d49ec567ca1307859938a086e Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Fri, 14 Nov 2025 22:39:36 +0800 Subject: [PATCH 9/9] fix: priority conflict with priorityClass issue --- internal/webhook/v1/pod_webhook.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 55315584..5d73f0e6 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -181,6 +181,9 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque (tfInfo.Profile.Qos == tfv1.QoSHigh || tfInfo.Profile.Qos == tfv1.QoSCritical) { pod.Spec.PriorityClassName = fmt.Sprintf("%s-%s", constants.TensorFusionSystemName, string(tfInfo.Profile.Qos)) + // Remove priority field if PriorityClassName is set, as Kubernetes Priority admission controller + // will compute priority from PriorityClassName and doesn't allow both fields to be set + pod.Spec.Priority = nil } // Inject initContainer and env variables