Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/tensor-fusion/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.7.4
version: 1.7.5

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
2 changes: 2 additions & 0 deletions charts/tensor-fusion/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ schedulerConfig:
totalIntranetBandWidthGBps: 100
- name: NodeResourcesFit
args:
ignoredResourceGroups:
- "tensor-fusion.ai"
scoringStrategy:
resources:
- name: cpu
Expand Down
26 changes: 22 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/NexusGPU/tensor-fusion/internal/constants"
"github.com/NexusGPU/tensor-fusion/internal/controller"
"github.com/NexusGPU/tensor-fusion/internal/gpuallocator"
"github.com/NexusGPU/tensor-fusion/internal/indexallocator"
"github.com/NexusGPU/tensor-fusion/internal/metrics"
"github.com/NexusGPU/tensor-fusion/internal/portallocator"
"github.com/NexusGPU/tensor-fusion/internal/scheduler/expander"
Expand Down Expand Up @@ -232,17 +233,25 @@ func main() {
// Initialize GPU allocator and set up watches
allocator, portAllocator := startTensorFusionAllocators(ctx, mgr)

// Initialize Index allocator for Device Plugin communication
indexAllocator, err := indexallocator.NewIndexAllocator(ctx, mgr.GetClient())
if err != nil {
setupLog.Error(err, "unable to set up index allocator")
os.Exit(1)
}
_ = indexAllocator.SetupWithManager(ctx, mgr)

startAutoScaler(mgr, allocator)

// Create pricing provider for webhook
pricingProvider := pricing.NewStaticPricingProvider()
startWebhook(mgr, portAllocator, pricingProvider)
startWebhook(mgr, portAllocator, indexAllocator, pricingProvider)

scheduler, nodeExpander := startScheduler(ctx, allocator, mgr, k8sVersion)

startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander)

startHttpServerForTFClient(ctx, kc, portAllocator, allocator, scheduler, mgr.Elected())
startHttpServerForTFClient(ctx, kc, portAllocator, indexAllocator, allocator, scheduler, mgr.Elected())

// +kubebuilder:scaffold:builder
addHealthCheckAPI(mgr)
Expand Down Expand Up @@ -291,6 +300,7 @@ func startHttpServerForTFClient(
ctx context.Context,
kc *rest.Config,
portAllocator *portallocator.PortAllocator,
indexAllocator *indexallocator.IndexAllocator,
allocator *gpuallocator.GpuAllocator,
scheduler *scheduler.Scheduler,
leaderChan <-chan struct{},
Expand All @@ -310,12 +320,19 @@ func startHttpServerForTFClient(
setupLog.Error(err, "failed to create assign host port router")
os.Exit(1)
}
assignIndexRouter, err := router.NewAssignIndexRouter(ctx, indexAllocator)
if err != nil {
setupLog.Error(err, "failed to create assign index router")
os.Exit(1)
}
allocatorInfoRouter, err := router.NewAllocatorInfoRouter(ctx, allocator, scheduler)
if err != nil {
setupLog.Error(err, "failed to create allocator info router")
os.Exit(1)
}
httpServer := server.NewHTTPServer(connectionRouter, assignHostPortRouter, allocatorInfoRouter, leaderChan)
httpServer := server.NewHTTPServer(
connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, leaderChan,
)
go func() {
err := httpServer.Run()
if err != nil {
Expand Down Expand Up @@ -468,12 +485,13 @@ func startCustomResourceController(
func startWebhook(
mgr manager.Manager,
portAllocator *portallocator.PortAllocator,
indexAllocator *indexallocator.IndexAllocator,
pricingProvider pricing.PricingProvider,
) {
if os.Getenv(constants.EnableWebhookEnv) == constants.FalseStringValue {
return
}
if err := webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator, pricingProvider); err != nil {
if err := webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator, indexAllocator, pricingProvider); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Pod")
os.Exit(1)
}
Expand Down
2 changes: 2 additions & 0 deletions config/samples/scheduler-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ profiles:
totalIntranetBandWidthGBps: 100
- name: NodeResourcesFit
args:
ignoredResourceGroups:
- "tensor-fusion.ai"
scoringStrategy:
resources:
- name: cpu
Expand Down
4 changes: 4 additions & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
// Additional worker pod template is set by user with /worker-pod-template annotation
WorkerPodTemplateAnnotation = Domain + "/worker-pod-template"

// Pod index annotation for Device Plugin communication (1-512)
PodIndexAnnotation = Domain + "/index"

WorkloadModeAnnotation = Domain + "/workload-mode"
WorkloadModeDynamic = "dynamic"
WorkloadModeFixed = "fixed"
Expand Down Expand Up @@ -119,6 +122,7 @@ const (
TensorFusionPodCounterKeyAnnotation = Domain + "/pod-counter-key"
TensorFusionPodCountAnnotation = Domain + "/tf-pod-count"
TensorFusionWorkerSuffix = "-tf"
NodeUsedByTaintKey = Domain + "/used-by"

// For grey release
TensorFusionEnabledReplicasAnnotation = Domain + "/enabled-replicas"
Expand Down
20 changes: 10 additions & 10 deletions internal/constants/vendors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ const (

// DSA vendors - Global
AcceleratorVendorQualcomm = "Qualcomm"
AcceleratorVendorAWSNeuron = "AWS-Neuron"
AcceleratorVendorGoogleTPU = "Google-TPU"
AcceleratorVendorAWSNeuron = "AWSNeuron"
AcceleratorVendorGoogleTPU = "Google"
AcceleratorVendorCerebras = "Cerebras"

// GPGPU vendors - CN
AcceleratorVendorHygon = "Hygon-DCU"
AcceleratorVendorMetaX = "Meta-X"
AcceleratorVendorHygon = "Hygon"
AcceleratorVendorMetaX = "MetaX"
AcceleratorVendorMThreads = "MThreads"
AcceleratorVendorBiren = "BirenGPU"
AcceleratorVendorAlibabaTHead = "THead-PPU"
AcceleratorVendorBiren = "Biren"
AcceleratorVendorAlibabaTHead = "THead"

// DSA vendors - CN
AcceleratorVendorHuaweiAscendNPU = "Ascend-NPU"
AcceleratorVendorCambricon = "Cambricon-MLU"
AcceleratorVendorEnflame = "Enflame-XPU"
AcceleratorVendorKunlunX = "KunlunXin-XPU"
AcceleratorVendorHuaweiAscendNPU = "Ascend"
AcceleratorVendorCambricon = "Cambricon"
AcceleratorVendorEnflame = "Enflame"
AcceleratorVendorKunlunX = "KunlunXin"

AcceleratorVendorUnknown = "Unknown"
)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/gpunode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity(

return nil
} else {
gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator)
gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator, coreNode)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/gpuallocator/gpuallocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var _ = Describe("GPU Allocator", func() {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: "test-pool"}, pool); err != nil {
Expect(err).NotTo(HaveOccurred())
}
_, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator)
_, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator, nil)

// Verify resources were reduced on the allocated GPU
gpu := getGPU(gpus[0].Name)
Expand Down
30 changes: 30 additions & 0 deletions internal/gpuallocator/node_capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ import (

tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
"github.com/NexusGPU/tensor-fusion/internal/constants"
"github.com/NexusGPU/tensor-fusion/internal/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/util/taints"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func RefreshGPUNodeCapacity(
ctx context.Context, k8sClient client.Client,
node *tfv1.GPUNode, pool *tfv1.GPUPool,
allocator *GpuAllocator,
coreNode *corev1.Node,
) ([]string, error) {
gpuList := &tfv1.GPUList{}
if err := k8sClient.List(ctx, gpuList, client.MatchingLabels{constants.LabelKeyOwner: node.Name}); err != nil {
Expand Down Expand Up @@ -76,6 +81,31 @@ func RefreshGPUNodeCapacity(
if err != nil {
return nil, fmt.Errorf("failed to update GPU node status: %w", err)
}

// check if need to update K8S node label
if utils.IsProgressiveMigration() && coreNode != nil {
taint := &corev1.Taint{
Key: constants.NodeUsedByTaintKey,
Effect: corev1.TaintEffectNoSchedule,
Value: constants.TensorFusionSystemName,
}
needUpdateNode := false
if node.Status.AvailableVRAM.Equal(node.Status.TotalVRAM) && node.Status.AvailableTFlops.Equal(node.Status.TotalTFlops) {
// check if need to remove the taint
coreNode, needUpdateNode, _ = taints.RemoveTaint(coreNode, taint)
} else if !taints.TaintExists(coreNode.Spec.Taints, taint) {
// check if need to add the taint
coreNode, needUpdateNode, _ = taints.AddOrUpdateTaint(coreNode, taint)
}
if needUpdateNode {
log.FromContext(ctx).Info("Updating K8S node taints for isolation of tensor-fusion and non-tensor-fusion used nodes",
"node", coreNode.Name, "taint", taint.Key)
err := k8sClient.Update(ctx, coreNode)
if err != nil {
return nil, fmt.Errorf("failed to update K8S node: %w", err)
}
}
}
}
return gpuModels, nil
}
Expand Down
97 changes: 97 additions & 0 deletions internal/indexallocator/indexallocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package indexallocator

import (
"context"
"fmt"
"sync/atomic"

"github.com/NexusGPU/tensor-fusion/internal/constants"
"github.com/NexusGPU/tensor-fusion/internal/utils"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
IndexRangeStart = 1
IndexRangeEnd = 512
)

// IndexAllocator manages allocation of 1-512 temporary indices for Pod-to-DevicePlugin communication
// Uses a simple atomic counter that increments from 1 to 512, then wraps around to 1
// No bitmap tracking needed - index reuse is acceptable after 512 cycles
type IndexAllocator struct {
IsLeader bool

// Atomic counter for index allocation (1-512, wraps around)
currentIndex int64

Client client.Client

ctx context.Context
}

func NewIndexAllocator(ctx context.Context, client client.Client) (*IndexAllocator, error) {
if client == nil {
return nil, fmt.Errorf("client cannot be nil")
}

allocator := &IndexAllocator{
Client: client,
IsLeader: false,
currentIndex: 0, // Will start from 1 on first assignment
ctx: ctx,
}

return allocator, nil
}

func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manager) <-chan struct{} {
readyCh := make(chan struct{}, 1)
_ = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
<-mgr.Elected()
s.IsLeader = true
leaderInfo := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: constants.LeaderInfoConfigMapName,
Namespace: utils.CurrentNamespace(),
},
}
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := controllerutil.CreateOrUpdate(ctx, s.Client, leaderInfo, func() error {
leaderInfo.Data = map[string]string{
constants.LeaderInfoConfigMapLeaderIPKey: utils.CurrentIP(),
}
return nil
})
return err
})
if err != nil {
log.FromContext(ctx).Error(err, "Failed to update leader IP info in ConfigMap")
}

readyCh <- struct{}{}
return nil
}))
return readyCh
}

// AssignIndex assigns a temporary index (1-512) for Pod-to-DevicePlugin communication
// Uses atomic increment to ensure thread-safe assignment
// Index wraps around from 512 to 1 (simple modulo operation)
func (s *IndexAllocator) AssignIndex(podName string) (int, error) {
if !s.IsLeader {
return 0, fmt.Errorf("only leader can assign index")
}

// Atomic increment and wrap around
next := atomic.AddInt64(&s.currentIndex, 1)
index := int((next-1)%IndexRangeEnd) + IndexRangeStart

return index, nil
}
Loading
Loading