diff --git a/charts/tensor-fusion/Chart.yaml b/charts/tensor-fusion/Chart.yaml index adf0639f..cf5b8c13 100644 --- a/charts/tensor-fusion/Chart.yaml +++ b/charts/tensor-fusion/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.7.4 +version: 1.7.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/tensor-fusion/values.yaml b/charts/tensor-fusion/values.yaml index 3f7e9ebf..fbf1a9d0 100644 --- a/charts/tensor-fusion/values.yaml +++ b/charts/tensor-fusion/values.yaml @@ -208,6 +208,8 @@ schedulerConfig: totalIntranetBandWidthGBps: 100 - name: NodeResourcesFit args: + ignoredResourceGroups: + - "tensor-fusion.ai" scoringStrategy: resources: - name: cpu diff --git a/cmd/main.go b/cmd/main.go index 216b3ff7..ae36f8f1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -36,6 +36,7 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/controller" "github.com/NexusGPU/tensor-fusion/internal/gpuallocator" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/metrics" "github.com/NexusGPU/tensor-fusion/internal/portallocator" "github.com/NexusGPU/tensor-fusion/internal/scheduler/expander" @@ -232,17 +233,25 @@ func main() { // Initialize GPU allocator and set up watches allocator, portAllocator := startTensorFusionAllocators(ctx, mgr) + // Initialize Index allocator for Device Plugin communication + indexAllocator, err := indexallocator.NewIndexAllocator(ctx, mgr.GetClient()) + if err != nil { + setupLog.Error(err, "unable to set up index allocator") + os.Exit(1) + } + _ = indexAllocator.SetupWithManager(ctx, mgr) + startAutoScaler(mgr, allocator) // Create pricing provider for webhook pricingProvider := pricing.NewStaticPricingProvider() - startWebhook(mgr, portAllocator, pricingProvider) + startWebhook(mgr, portAllocator, indexAllocator, pricingProvider) scheduler, nodeExpander := startScheduler(ctx, allocator, mgr, k8sVersion) startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander) - startHttpServerForTFClient(ctx, kc, portAllocator, allocator, scheduler, mgr.Elected()) + startHttpServerForTFClient(ctx, kc, portAllocator, indexAllocator, allocator, scheduler, mgr.Elected()) // +kubebuilder:scaffold:builder addHealthCheckAPI(mgr) @@ -291,6 +300,7 @@ func startHttpServerForTFClient( ctx context.Context, kc *rest.Config, portAllocator *portallocator.PortAllocator, + indexAllocator *indexallocator.IndexAllocator, allocator *gpuallocator.GpuAllocator, scheduler *scheduler.Scheduler, leaderChan <-chan struct{}, @@ -310,12 +320,19 @@ func startHttpServerForTFClient( setupLog.Error(err, "failed to create assign host port router") os.Exit(1) } + assignIndexRouter, err := router.NewAssignIndexRouter(ctx, indexAllocator) + if err != nil { + setupLog.Error(err, "failed to create assign index router") + os.Exit(1) + } allocatorInfoRouter, err := router.NewAllocatorInfoRouter(ctx, allocator, scheduler) if err != nil { setupLog.Error(err, "failed to create allocator info router") os.Exit(1) } - httpServer := server.NewHTTPServer(connectionRouter, assignHostPortRouter, allocatorInfoRouter, leaderChan) + httpServer := server.NewHTTPServer( + connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, leaderChan, + ) go func() { err := httpServer.Run() if err != nil { @@ -468,12 +485,13 @@ func startCustomResourceController( func startWebhook( mgr manager.Manager, portAllocator *portallocator.PortAllocator, + indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider, ) { if os.Getenv(constants.EnableWebhookEnv) == constants.FalseStringValue { return } - if err := webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator, pricingProvider); err != nil { + if err := webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator, indexAllocator, pricingProvider); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "Pod") os.Exit(1) } diff --git a/config/samples/scheduler-config.yaml b/config/samples/scheduler-config.yaml index 74860dbf..81f5550a 100644 --- a/config/samples/scheduler-config.yaml +++ b/config/samples/scheduler-config.yaml @@ -40,6 +40,8 @@ profiles: totalIntranetBandWidthGBps: 100 - name: NodeResourcesFit args: + ignoredResourceGroups: + - "tensor-fusion.ai" scoringStrategy: resources: - name: cpu diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 91ad841e..557fdabd 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -92,6 +92,9 @@ const ( // Additional worker pod template is set by user with /worker-pod-template annotation WorkerPodTemplateAnnotation = Domain + "/worker-pod-template" + // Pod index annotation for Device Plugin communication (1-512) + PodIndexAnnotation = Domain + "/index" + WorkloadModeAnnotation = Domain + "/workload-mode" WorkloadModeDynamic = "dynamic" WorkloadModeFixed = "fixed" @@ -119,6 +122,7 @@ const ( TensorFusionPodCounterKeyAnnotation = Domain + "/pod-counter-key" TensorFusionPodCountAnnotation = Domain + "/tf-pod-count" TensorFusionWorkerSuffix = "-tf" + NodeUsedByTaintKey = Domain + "/used-by" // For grey release TensorFusionEnabledReplicasAnnotation = Domain + "/enabled-replicas" diff --git a/internal/constants/vendors.go b/internal/constants/vendors.go index 42871c71..f72c4636 100644 --- a/internal/constants/vendors.go +++ b/internal/constants/vendors.go @@ -8,22 +8,22 @@ const ( // DSA vendors - Global AcceleratorVendorQualcomm = "Qualcomm" - AcceleratorVendorAWSNeuron = "AWS-Neuron" - AcceleratorVendorGoogleTPU = "Google-TPU" + AcceleratorVendorAWSNeuron = "AWSNeuron" + AcceleratorVendorGoogleTPU = "Google" AcceleratorVendorCerebras = "Cerebras" // GPGPU vendors - CN - AcceleratorVendorHygon = "Hygon-DCU" - AcceleratorVendorMetaX = "Meta-X" + AcceleratorVendorHygon = "Hygon" + AcceleratorVendorMetaX = "MetaX" AcceleratorVendorMThreads = "MThreads" - AcceleratorVendorBiren = "BirenGPU" - AcceleratorVendorAlibabaTHead = "THead-PPU" + AcceleratorVendorBiren = "Biren" + AcceleratorVendorAlibabaTHead = "THead" // DSA vendors - CN - AcceleratorVendorHuaweiAscendNPU = "Ascend-NPU" - AcceleratorVendorCambricon = "Cambricon-MLU" - AcceleratorVendorEnflame = "Enflame-XPU" - AcceleratorVendorKunlunX = "KunlunXin-XPU" + AcceleratorVendorHuaweiAscendNPU = "Ascend" + AcceleratorVendorCambricon = "Cambricon" + AcceleratorVendorEnflame = "Enflame" + AcceleratorVendorKunlunX = "KunlunXin" AcceleratorVendorUnknown = "Unknown" ) diff --git a/internal/controller/gpunode_controller.go b/internal/controller/gpunode_controller.go index c1f34937..5fb0d3dd 100644 --- a/internal/controller/gpunode_controller.go +++ b/internal/controller/gpunode_controller.go @@ -188,7 +188,7 @@ func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity( return nil } else { - gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator) + gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator, coreNode) if err != nil { return err } diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index bb3a494d..496818d3 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -97,7 +97,7 @@ var _ = Describe("GPU Allocator", func() { if err := k8sClient.Get(ctx, types.NamespacedName{Name: "test-pool"}, pool); err != nil { Expect(err).NotTo(HaveOccurred()) } - _, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator) + _, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator, nil) // Verify resources were reduced on the allocated GPU gpu := getGPU(gpus[0].Name) diff --git a/internal/gpuallocator/node_capacity.go b/internal/gpuallocator/node_capacity.go index cdeb9141..56b79ace 100644 --- a/internal/gpuallocator/node_capacity.go +++ b/internal/gpuallocator/node_capacity.go @@ -6,15 +6,20 @@ import ( tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/utils" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/util/taints" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" ) func RefreshGPUNodeCapacity( ctx context.Context, k8sClient client.Client, node *tfv1.GPUNode, pool *tfv1.GPUPool, allocator *GpuAllocator, + coreNode *corev1.Node, ) ([]string, error) { gpuList := &tfv1.GPUList{} if err := k8sClient.List(ctx, gpuList, client.MatchingLabels{constants.LabelKeyOwner: node.Name}); err != nil { @@ -76,6 +81,31 @@ func RefreshGPUNodeCapacity( if err != nil { return nil, fmt.Errorf("failed to update GPU node status: %w", err) } + + // check if need to update K8S node label + if utils.IsProgressiveMigration() && coreNode != nil { + taint := &corev1.Taint{ + Key: constants.NodeUsedByTaintKey, + Effect: corev1.TaintEffectNoSchedule, + Value: constants.TensorFusionSystemName, + } + needUpdateNode := false + if node.Status.AvailableVRAM.Equal(node.Status.TotalVRAM) && node.Status.AvailableTFlops.Equal(node.Status.TotalTFlops) { + // check if need to remove the taint + coreNode, needUpdateNode, _ = taints.RemoveTaint(coreNode, taint) + } else if !taints.TaintExists(coreNode.Spec.Taints, taint) { + // check if need to add the taint + coreNode, needUpdateNode, _ = taints.AddOrUpdateTaint(coreNode, taint) + } + if needUpdateNode { + log.FromContext(ctx).Info("Updating K8S node taints for isolation of tensor-fusion and non-tensor-fusion used nodes", + "node", coreNode.Name, "taint", taint.Key) + err := k8sClient.Update(ctx, coreNode) + if err != nil { + return nil, fmt.Errorf("failed to update K8S node: %w", err) + } + } + } } return gpuModels, nil } diff --git a/internal/indexallocator/indexallocator.go b/internal/indexallocator/indexallocator.go new file mode 100644 index 00000000..31bef633 --- /dev/null +++ b/internal/indexallocator/indexallocator.go @@ -0,0 +1,97 @@ +package indexallocator + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/utils" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + IndexRangeStart = 1 + IndexRangeEnd = 512 +) + +// IndexAllocator manages allocation of 1-512 temporary indices for Pod-to-DevicePlugin communication +// Uses a simple atomic counter that increments from 1 to 512, then wraps around to 1 +// No bitmap tracking needed - index reuse is acceptable after 512 cycles +type IndexAllocator struct { + IsLeader bool + + // Atomic counter for index allocation (1-512, wraps around) + currentIndex int64 + + Client client.Client + + ctx context.Context +} + +func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocator, error) { + if client == nil { + return nil, fmt.Errorf("client cannot be nil") + } + + allocator := &IndexAllocator{ + Client: client, + IsLeader: false, + currentIndex: 0, // Will start from 1 on first assignment + ctx: ctx, + } + + return allocator, nil +} + +func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manager) <-chan struct{} { + readyCh := make(chan struct{}, 1) + _ = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + <-mgr.Elected() + s.IsLeader = true + leaderInfo := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.LeaderInfoConfigMapName, + Namespace: utils.CurrentNamespace(), + }, + } + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + _, err := controllerutil.CreateOrUpdate(ctx, s.Client, leaderInfo, func() error { + leaderInfo.Data = map[string]string{ + constants.LeaderInfoConfigMapLeaderIPKey: utils.CurrentIP(), + } + return nil + }) + return err + }) + if err != nil { + log.FromContext(ctx).Error(err, "Failed to update leader IP info in ConfigMap") + } + + readyCh <- struct{}{} + return nil + })) + return readyCh +} + +// AssignIndex assigns a temporary index (1-512) for Pod-to-DevicePlugin communication +// Uses atomic increment to ensure thread-safe assignment +// Index wraps around from 512 to 1 (simple modulo operation) +func (s *IndexAllocator) AssignIndex(podName string) (int, error) { + if !s.IsLeader { + return 0, fmt.Errorf("only leader can assign index") + } + + // Atomic increment and wrap around + next := atomic.AddInt64(&s.currentIndex, 1) + index := int((next-1)%IndexRangeEnd) + IndexRangeStart + + return index, nil +} diff --git a/internal/indexallocator/indexallocator_test.go b/internal/indexallocator/indexallocator_test.go new file mode 100644 index 00000000..c9d7069e --- /dev/null +++ b/internal/indexallocator/indexallocator_test.go @@ -0,0 +1,78 @@ +package indexallocator + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestIndexAllocator_AssignIndex(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + client := fake.NewClientBuilder().WithScheme(scheme).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + allocator.IsLeader = true + + // Test assigning first index + index1, err := allocator.AssignIndex("pod-1") + assert.NoError(t, err) + assert.Equal(t, 1, index1) + + // Test assigning second index + index2, err := allocator.AssignIndex("pod-2") + assert.NoError(t, err) + assert.Equal(t, 2, index2) + + // Test assigning multiple indices - verify ascending order + for i := 3; i <= 10; i++ { + index, err := allocator.AssignIndex("pod-" + string(rune(i))) + assert.NoError(t, err) + assert.Equal(t, i, index, "index should be assigned in ascending order") + } +} + +func TestIndexAllocator_AssignIndex_IncrementalOrder(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + client := fake.NewClientBuilder().WithScheme(scheme).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + allocator.IsLeader = true + + // Test that indices are assigned in incremental order (1, 2, 3, ...) + expectedIndex := 1 + for i := 0; i < 20; i++ { + index, err := allocator.AssignIndex("pod-" + string(rune(i))) + assert.NoError(t, err) + assert.Equal(t, expectedIndex, index, "index should be assigned in ascending order") + expectedIndex++ + } +} + +func TestIndexAllocator_WrapAround(t *testing.T) { + scheme := fake.NewClientBuilder().WithScheme(fake.NewClientBuilder().Build().Scheme()).Build().Scheme() + client := fake.NewClientBuilder().WithScheme(scheme).Build() + + ctx := context.Background() + allocator, err := NewIndexAllocator(ctx, client) + require.NoError(t, err) + allocator.IsLeader = true + + // Assign indices until we reach 512 + for i := 1; i <= 512; i++ { + index, err := allocator.AssignIndex("pod-" + string(rune(i))) + assert.NoError(t, err) + assert.Equal(t, i, index) + } + + // Next assignment should wrap around to 1 + index, err := allocator.AssignIndex("pod-513") + assert.NoError(t, err) + assert.Equal(t, 1, index, "index should wrap around from 512 to 1") +} diff --git a/internal/portallocator/portallocator.go b/internal/portallocator/portallocator.go index 1fba53df..1a050eee 100644 --- a/internal/portallocator/portallocator.go +++ b/internal/portallocator/portallocator.go @@ -152,19 +152,7 @@ func (s *PortAllocator) SetupWithManager(ctx context.Context, mgr manager.Manage } func (s *PortAllocator) GetLeaderIP() string { - leaderInfo := &v1.ConfigMap{} - err := s.Client.Get(context.Background(), client.ObjectKey{ - Name: constants.LeaderInfoConfigMapName, - Namespace: utils.CurrentNamespace(), - }, leaderInfo) - if err != nil { - log.FromContext(context.Background()).Error(err, "Failed to get leader IP info from ConfigMap") - return "" - } - if leaderInfo.Data == nil { - return "" - } - return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey] + return utils.GetLeaderIP(s.Client) } // AssignHostPort always called by operator itself, thus no Leader-Follower inconsistency issue diff --git a/internal/scheduler/gpuresources/gpuresources.go b/internal/scheduler/gpuresources/gpuresources.go index 3f3b6f12..c3759fad 100644 --- a/internal/scheduler/gpuresources/gpuresources.go +++ b/internal/scheduler/gpuresources/gpuresources.go @@ -431,6 +431,15 @@ func (s *GPUFit) Reserve(ctx context.Context, state fwk.CycleState, pod *v1.Pod, if err != nil { return fwk.NewStatus(fwk.Error, err.Error()) } + + // Index is already assigned in webhook stage, scheduler cannot modify Pod + // Just verify that index annotation exists for logging + if pod.Annotations != nil { + if indexStr, exists := pod.Annotations[constants.PodIndexAnnotation]; exists && indexStr != "" { + s.logger.V(5).Info("Pod index already assigned in webhook", "pod", pod.Name, "index", indexStr) + } + } + return fwk.NewStatus(fwk.Success, "") } @@ -467,11 +476,12 @@ func (s *GPUFit) PostBind(ctx context.Context, state fwk.CycleState, pod *v1.Pod // write the allocated GPU info to Pod in bindingCycle, before default binder changing the Pod nodeName info gpuIDs := strings.Join(gpuSchedulingResult.(*GPUSchedulingStateData).FinalGPUs, ",") s.logger.Info("PostBinding pod for GPU resources", "pod", pod.Name, "node", nodeName, "gpuIDs", gpuIDs) + + // Patch GPU device IDs annotation patch := []byte(`[{ "op": "add", "path": "/metadata/annotations/` + utils.EscapeJSONPointer(constants.GPUDeviceIDsAnnotation) + `", "value": "` + gpuIDs + `"}]`) - err = s.client.Patch(s.ctx, pod, client.RawPatch(types.JSONPatchType, patch)) if err != nil { s.logger.Error(err, "failed to patch gpu device ids", "pod", pod.Name) diff --git a/internal/server/router/assign_index.go b/internal/server/router/assign_index.go new file mode 100644 index 00000000..b3f0c6a0 --- /dev/null +++ b/internal/server/router/assign_index.go @@ -0,0 +1,63 @@ +package router + +import ( + "context" + "fmt" + "net/http" + + "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" + "github.com/NexusGPU/tensor-fusion/internal/utils" + "github.com/gin-gonic/gin" + v1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const assignIndexTokenReviewName = "assign-index-token-review" + +type AssignIndexRouter struct { + allocator *indexallocator.IndexAllocator +} + +func NewAssignIndexRouter(ctx context.Context, allocator *indexallocator.IndexAllocator) (*AssignIndexRouter, error) { + return &AssignIndexRouter{allocator: allocator}, nil +} + +func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) { + podName := ctx.Query("podName") + token := ctx.Request.Header.Get(constants.AuthorizationHeader) + + if token == "" { + log.FromContext(ctx).Error(nil, "assigned index failed, missing token", "podName", podName) + ctx.String(http.StatusUnauthorized, "missing authorization header") + return + } + tokenReview := &v1.TokenReview{ + ObjectMeta: metav1.ObjectMeta{ + Name: assignIndexTokenReviewName, + }, + Spec: v1.TokenReviewSpec{ + Token: token, + }, + } + if err := r.allocator.Client.Create(ctx, tokenReview); err != nil { + log.FromContext(ctx).Error(err, "assigned index failed, auth endpoint error", "podName", podName) + ctx.String(http.StatusInternalServerError, "auth endpoint error") + return + } + if !tokenReview.Status.Authenticated || tokenReview.Status.User.Username != utils.GetSelfServiceAccountNameFull() { + log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName) + ctx.String(http.StatusUnauthorized, "token authentication failed") + return + } + + index, err := r.allocator.AssignIndex(podName) + if err != nil { + log.FromContext(ctx).Error(err, "assigned index failed", "podName", podName) + ctx.String(http.StatusInternalServerError, err.Error()) + return + } + log.FromContext(ctx).Info("assigned index successfully", "podName", podName, "index", index) + ctx.String(http.StatusOK, fmt.Sprintf("%d", index)) +} diff --git a/internal/server/server.go b/internal/server/server.go index f002e446..432fb61d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -13,6 +13,7 @@ import ( func NewHTTPServer( cr *router.ConnectionRouter, ahp *router.AssignHostPortRouter, + ai *router.AssignIndexRouter, alc *router.AllocatorInfoRouter, leaderChan <-chan struct{}, ) *gin.Engine { @@ -39,6 +40,19 @@ func NewHTTPServer( // suspend API call utils it becomes leader ahp.AssignHostPort(ctx) }) + apiGroup.GET("/assign-index", func(ctx *gin.Context) { + if leaderChan == nil { + ctx.String(http.StatusServiceUnavailable, "current instance is not the leader") + return + } + select { + case <-ctx.Done(): + return + case <-leaderChan: + } + // suspend API call utils it becomes leader + ai.AssignIndex(ctx) + }) apiGroup.GET("/provision", func(ctx *gin.Context) { controller.ProvisioningToggle = ctx.Query("enable") == "true" }) diff --git a/internal/utils/config.go b/internal/utils/config.go index ee4a228c..b7442644 100644 --- a/internal/utils/config.go +++ b/internal/utils/config.go @@ -1,7 +1,7 @@ package utils import ( - context "context" + "context" "encoding/base64" "encoding/json" "fmt" @@ -14,7 +14,9 @@ import ( "github.com/lithammer/shortuuid/v4" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" ) @@ -197,3 +199,21 @@ func IsProgressiveMigration() bool { func SetProgressiveMigration(isProgressiveMigration bool) { nvidiaOperatorProgressiveMigrationEnv = isProgressiveMigration } + +// GetLeaderIP retrieves the leader IP from the ConfigMap +// This is used by both PortAllocator and IndexAllocator for distributed allocation +func GetLeaderIP(client client.Client) string { + leaderInfo := &corev1.ConfigMap{} + err := client.Get(context.Background(), types.NamespacedName{ + Name: constants.LeaderInfoConfigMapName, + Namespace: CurrentNamespace(), + }, leaderInfo) + if err != nil { + ctrl.Log.V(5).Info("Failed to get leader IP info from ConfigMap", "error", err) + return "" + } + if leaderInfo.Data == nil { + return "" + } + return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey] +} diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 71243776..5d73f0e6 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/strategicpatch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,6 +39,7 @@ import ( tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" "github.com/NexusGPU/tensor-fusion/internal/cloudprovider/pricing" "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/portallocator" "github.com/NexusGPU/tensor-fusion/internal/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,24 +48,26 @@ import ( var httpClient = &http.Client{Timeout: 10 * time.Second} // SetupPodWebhookWithManager registers the webhook for Pod in the manager. -func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, pricingProvider pricing.PricingProvider) error { +func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider) error { webhookServer := mgr.GetWebhookServer() webhookServer.Register("/mutate-v1-pod", &admission.Webhook{ Handler: &TensorFusionPodMutator{ - decoder: admission.NewDecoder(mgr.GetScheme()), - Client: mgr.GetClient(), - portAllocator: portAllocator, + decoder: admission.NewDecoder(mgr.GetScheme()), + Client: mgr.GetClient(), + portAllocator: portAllocator, + indexAllocator: indexAllocator, }, }) return nil } type TensorFusionPodMutator struct { - Client client.Client - decoder admission.Decoder - portAllocator *portallocator.PortAllocator + Client client.Client + decoder admission.Decoder + portAllocator *portallocator.PortAllocator + indexAllocator *indexallocator.IndexAllocator } // Handle implements admission.Handler interface. @@ -175,12 +179,16 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque // Add priorityClass if contains higher QoS level and Pod priority class not specified if pod.Spec.PriorityClassName == "" && (tfInfo.Profile.Qos == tfv1.QoSHigh || tfInfo.Profile.Qos == tfv1.QoSCritical) { - pod.Spec.PriorityClassName = constants.TensorFusionSystemName + string(tfInfo.Profile.Qos) + pod.Spec.PriorityClassName = fmt.Sprintf("%s-%s", + constants.TensorFusionSystemName, string(tfInfo.Profile.Qos)) + // Remove priority field if PriorityClassName is set, as Kubernetes Priority admission controller + // will compute priority from PriorityClassName and doesn't allow both fields to be set + pod.Spec.Priority = nil } // Inject initContainer and env variables patches, err := m.patchTFClient( - pod, pool, tfInfo.Profile.IsLocalGPU, currentBytes, containerIndices, tfInfo.Profile.SidecarWorker, + ctx, pod, pool, tfInfo.Profile.IsLocalGPU, currentBytes, containerIndices, tfInfo.Profile.SidecarWorker, ) if err != nil { log.Error(err, "failed to patch tf client", "pod", req.Name, "namespace", req.Namespace) @@ -265,6 +273,7 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload( } func (m *TensorFusionPodMutator) patchTFClient( + ctx context.Context, pod *corev1.Pod, pool *tfv1.GPUPool, isLocalGPU bool, @@ -281,6 +290,13 @@ func (m *TensorFusionPodMutator) patchTFClient( assignPodLabelsAndAnnotations(isLocalGPU, pod, pool) + // Assign index once per pod (before processing containers) + // Index must be assigned in webhook stage since scheduler cannot modify Pod + // This is a special index resource (1-512), not a real device resource + // Index is assigned in ascending order (1, 2, 3, ...) via distributed lock (leader election) + index := m.assignDeviceAllocationIndex(ctx, pod) + log.FromContext(ctx).Info("assigned device allocation index successfully", "index", index, "pod", pod.Name) + for _, containerIndex := range containerIndices { container := &pod.Spec.Containers[containerIndex] containerJSON, err := json.Marshal(container) @@ -307,6 +323,17 @@ func (m *TensorFusionPodMutator) patchTFClient( removeNativeGPULimits(container) + // Inject tensor-fusion.ai/index resource for Device Plugin communication + // This is a special index resource (not a real device), used for Pod-to-DevicePlugin communication + if container.Resources.Requests == nil { + container.Resources.Requests = make(corev1.ResourceList) + } + if container.Resources.Limits == nil { + container.Resources.Limits = make(corev1.ResourceList) + } + // Limit is set to actual index value (1-512) for Device Plugin to match Pod + container.Resources.Limits[constants.PodIndexAnnotation] = resource.MustParse(strconv.Itoa(index)) + if !isLocalGPU { addConnectionForRemoteFixedReplicaVirtualGPU(pod, container, clientConfig) } else if isSidecarWorker { @@ -351,6 +378,46 @@ func (m *TensorFusionPodMutator) patchTFClient( return patches, nil } +func (m *TensorFusionPodMutator) assignDeviceAllocationIndex(ctx context.Context, pod *corev1.Pod) int { + var index int + var indexErr error + podIdentifier := pod.Name + if podIdentifier == "" { + // For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID + podIdentifier = pod.GenerateName + string(pod.UID) + } + + if m.indexAllocator != nil && m.indexAllocator.IsLeader { + index, indexErr = m.indexAllocator.AssignIndex(podIdentifier) + if indexErr != nil { + log := log.FromContext(ctx) + log.Error(indexErr, "failed to assign index for pod", "pod", podIdentifier) + index = 0 + } + } else if m.indexAllocator != nil && !m.indexAllocator.IsLeader { + // If not leader, get index from leader via HTTP API (similar to port allocation) + // This ensures global increment across distributed webhook instances + index, indexErr = m.assignIndexFromLeader(ctx, pod) + if indexErr != nil { + log := log.FromContext(ctx) + log.Error(indexErr, "failed to assign index from leader", "pod", podIdentifier) + index = 0 + } + } else { + // No allocator available, use 0 as fallback + index = 0 + } + + // Set annotation for matching in Device Plugin + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if index > 0 { + pod.Annotations[constants.PodIndexAnnotation] = strconv.Itoa(index) + } + return index +} + // Convert the strategic merge patch to JSON func calculatePodPatch(currentBytes []byte, pod *corev1.Pod, clientConfig *tfv1.ClientConfig, isLocalGPU bool) ([]jsonpatch.JsonPatchOperation, error) { var patchBytes []byte @@ -392,6 +459,14 @@ func assignPodLabelsAndAnnotations(isLocalGPU bool, pod *corev1.Pod, pool *tfv1. pod.Labels[constants.LabelComponent] = constants.ComponentWorker pod.Annotations[constants.EmbeddedWorkerAnnotation] = constants.TrueStringValue // no need to add port in local gpu mode, communication is done through shared memory in the same process + + // Add toleration for TensorFusion nodes + pod.Spec.Tolerations = append(pod.Spec.Tolerations, corev1.Toleration{ + Key: constants.NodeUsedByTaintKey, + Operator: corev1.TolerationOpEqual, + Value: constants.TensorFusionSystemName, + Effect: corev1.TaintEffectNoSchedule, + }) } else { pod.Labels[constants.LabelComponent] = constants.ComponentClient } @@ -493,8 +568,7 @@ func (m *TensorFusionPodMutator) getPortNumber(pod *corev1.Pod) int { } func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod) (int, error) { - - leaderIP := m.portAllocator.GetLeaderIP() + leaderIP := utils.GetLeaderIP(m.Client) if leaderIP == "" { return 0, fmt.Errorf("operator leader IP not found") } @@ -525,6 +599,42 @@ func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod return strconv.Atoi(string(body)) } +func (m *TensorFusionPodMutator) assignIndexFromLeader(ctx context.Context, pod *corev1.Pod) (int, error) { + leaderIP := utils.GetLeaderIP(m.Client) + if leaderIP == "" { + return 0, fmt.Errorf("operator leader IP not found") + } + + podIdentifier := pod.Name + if podIdentifier == "" { + podIdentifier = pod.GenerateName + string(pod.UID) + } + urlStr := fmt.Sprintf("http://%s:8080/assign-index?podName=%s", leaderIP, podIdentifier) + req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil) + if err != nil { + return 0, err + } + req.Header.Set(constants.AuthorizationHeader, "Bearer "+utils.ReadServiceAccountToken()) + resp, err := httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to assign index: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("index allocation failed: %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read allocation response: %w", err) + } + + return strconv.Atoi(string(body)) +} + func calculateQoSLevel(profile *tfv1.WorkloadProfileSpec, pool *tfv1.GPUPool) tfv1.QoSLevel { // when not set, assign default QoS if profile.Qos == "" { diff --git a/internal/webhook/v1/pod_webhook_test.go b/internal/webhook/v1/pod_webhook_test.go index ca1dc4fc..d59d2ebf 100644 --- a/internal/webhook/v1/pod_webhook_test.go +++ b/internal/webhook/v1/pod_webhook_test.go @@ -896,7 +896,7 @@ var _ = Describe("TensorFusionPodMutator", func() { currentBytes, err := json.Marshal(pod) Expect(err).NotTo(HaveOccurred()) - patch, err := mutator.patchTFClient(pod, pool, false, currentBytes, []int{0}, false) + patch, err := mutator.patchTFClient(context.Background(), pod, pool, false, currentBytes, []int{0}, false) Expect(err).NotTo(HaveOccurred()) Expect(patch).NotTo(BeEmpty()) // There should be at least 2 patches (initContainers and the container env patches) diff --git a/internal/webhook/v1/webhook_suite_test.go b/internal/webhook/v1/webhook_suite_test.go index 26a6685d..5e96bd22 100644 --- a/internal/webhook/v1/webhook_suite_test.go +++ b/internal/webhook/v1/webhook_suite_test.go @@ -141,7 +141,7 @@ var _ = BeforeSuite(func() { PortRangeStartCluster: 42000, PortRangeEndCluster: 62000, BitmapCluster: make([]uint64, (62000-42000)/64+1), - }, mockPricingProvider) + }, nil, mockPricingProvider) Expect(err).NotTo(HaveOccurred()) // +kubebuilder:scaffold:webhook diff --git a/internal/worker/worker.go b/internal/worker/worker.go index d803c6e2..18874a36 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -60,6 +60,14 @@ func (wg *WorkerGenerator) GenerateWorkerPod( spec.EnableServiceLinks = ptr.To(false) spec.SchedulerName = constants.SchedulerName + // tolerate the nodes that used by TensorFusion system + spec.Tolerations = append(spec.Tolerations, v1.Toleration{ + Key: constants.NodeUsedByTaintKey, + Operator: v1.TolerationOpEqual, + Value: constants.TensorFusionSystemName, + Effect: v1.TaintEffectNoSchedule, + }) + // Add labels to identify this pod as part of the workload labels, annotations := utils.AppendTFWorkerLabelsAndAnnotationsAfterTemplate(podTmpl, workload, containerName)