From 70dd29dc8aa97cbacfbc688f6ba3817a8e22ead5 Mon Sep 17 00:00:00 2001 From: code2life Date: Fri, 28 Nov 2025 17:43:55 +0800 Subject: [PATCH 1/2] fix: forward leader api auth issue --- internal/server/router/assign_host_port.go | 6 ++--- internal/server/router/assign_index.go | 9 ++++---- internal/utils/config.go | 5 +++++ internal/utils/svr.go | 26 ++++++++++++++++++++++ 4 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 internal/utils/svr.go diff --git a/internal/server/router/assign_host_port.go b/internal/server/router/assign_host_port.go index 6c94fe30..af0b7156 100644 --- a/internal/server/router/assign_host_port.go +++ b/internal/server/router/assign_host_port.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" - "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/portallocator" "github.com/NexusGPU/tensor-fusion/internal/utils" "github.com/gin-gonic/gin" @@ -26,9 +25,8 @@ func NewAssignHostPortRouter(ctx context.Context, allocator *portallocator.PortA func (r *AssignHostPortRouter) AssignHostPort(ctx *gin.Context) { podName := ctx.Query("podName") - token := ctx.Request.Header.Get(constants.AuthorizationHeader) - - if token == "" { + token, ok := utils.ExtractBearerToken(ctx) + if !ok { log.FromContext(ctx).Error(nil, "assigned host port failed, missing token", "podName", podName) ctx.String(http.StatusUnauthorized, "missing authorization header") return diff --git a/internal/server/router/assign_index.go b/internal/server/router/assign_index.go index b3f0c6a0..e8fd43f6 100644 --- a/internal/server/router/assign_index.go +++ b/internal/server/router/assign_index.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" - "github.com/NexusGPU/tensor-fusion/internal/constants" "github.com/NexusGPU/tensor-fusion/internal/indexallocator" "github.com/NexusGPU/tensor-fusion/internal/utils" "github.com/gin-gonic/gin" @@ -26,9 +25,8 @@ func NewAssignIndexRouter(ctx context.Context, allocator *indexallocator.IndexAl func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) { podName := ctx.Query("podName") - token := ctx.Request.Header.Get(constants.AuthorizationHeader) - - if token == "" { + token, ok := utils.ExtractBearerToken(ctx) + if !ok { log.FromContext(ctx).Error(nil, "assigned index failed, missing token", "podName", podName) ctx.String(http.StatusUnauthorized, "missing authorization header") return @@ -47,7 +45,8 @@ func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) { return } if !tokenReview.Status.Authenticated || tokenReview.Status.User.Username != utils.GetSelfServiceAccountNameFull() { - log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName) + log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName, + "authPassed", tokenReview.Status.Authenticated, "username", tokenReview.Status.User.Username) ctx.String(http.StatusUnauthorized, "token authentication failed") return } diff --git a/internal/utils/config.go b/internal/utils/config.go index ba2e732b..23256dc2 100644 --- a/internal/utils/config.go +++ b/internal/utils/config.go @@ -29,6 +29,11 @@ const ( var selfServiceAccountName string func InitServiceAccountConfig() { + if os.Getenv("IMPERSONATE_SERVICE_ACCOUNT") != "" { + selfServiceAccountName = os.Getenv("IMPERSONATE_SERVICE_ACCOUNT") + ctrl.Log.Info("impersonate service account mode detected", "name", selfServiceAccountName) + return + } data, err := os.ReadFile(ServiceAccountTokenPath) if err != nil { ctrl.Log.Info("service account token not found, run outside of Kubernetes cluster") diff --git a/internal/utils/svr.go b/internal/utils/svr.go new file mode 100644 index 00000000..5ce348bf --- /dev/null +++ b/internal/utils/svr.go @@ -0,0 +1,26 @@ +package utils + +import ( + "strings" + + "github.com/NexusGPU/tensor-fusion/internal/constants" + "github.com/gin-gonic/gin" +) + +const BearerPrefix = "Bearer " + +// ExtractBearerToken extracts the authorization token from the gin context. +// It handles both cases: token with "Bearer " prefix and token without prefix. +// Returns the token string (with Bearer prefix stripped if present) and true if token exists. +// Returns empty string and false if token is missing. +func ExtractBearerToken(ctx *gin.Context) (string, bool) { + token := ctx.Request.Header.Get(constants.AuthorizationHeader) + if token == "" { + return "", false + } + + // Strip Bearer prefix if present + token = strings.TrimPrefix(token, BearerPrefix) + + return token, true +} From a1a10c9d0196ad862efa9b1ef24506e7aff96da3 Mon Sep 17 00:00:00 2001 From: Joey <569475269@qq.com> Date: Fri, 28 Nov 2025 18:03:46 +0800 Subject: [PATCH 2/2] fix: forward api auth issue --- .vscode/settings.json | 1 + cmd/main.go | 1 + internal/indexallocator/indexallocator.go | 4 ++-- internal/webhook/v1/pod_webhook.go | 14 +++++++++++--- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 20c71d38..5be70139 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -84,6 +84,7 @@ "Hygon", "iface", "imageutils", + "indexallocator", "influxdata", "internalcache", "internalqueue", diff --git a/cmd/main.go b/cmd/main.go index 540f2f11..c55a219c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -343,6 +343,7 @@ func startHttpServerForTFClient( connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, nodeScalerInfoRouter, leaderChan, ) go func() { + // Port defaults to 8080, set with env var `PORT` err := httpServer.Run() if err != nil { setupLog.Error(err, "problem running HTTP server") diff --git a/internal/indexallocator/indexallocator.go b/internal/indexallocator/indexallocator.go index 31bef633..d839589e 100644 --- a/internal/indexallocator/indexallocator.go +++ b/internal/indexallocator/indexallocator.go @@ -86,12 +86,12 @@ func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manag // Index wraps around from 512 to 1 (simple modulo operation) func (s *IndexAllocator) AssignIndex(podName string) (int, error) { if !s.IsLeader { + log.FromContext(s.ctx).Error(nil, "only leader can assign index", "podName", podName) return 0, fmt.Errorf("only leader can assign index") } - // Atomic increment and wrap around next := atomic.AddInt64(&s.currentIndex, 1) index := int((next-1)%IndexRangeEnd) + IndexRangeStart - + log.FromContext(s.ctx).Info("assigned index successfully", "podName", podName, "index", index) return index, nil } diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 589ac46f..0841f423 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/http" + "os" "strconv" "time" @@ -46,6 +47,13 @@ import ( ) var httpClient = &http.Client{Timeout: 10 * time.Second} +var operatorPort = "8080" + +func init() { + if port := os.Getenv("PORT"); port != "" { + operatorPort = port + } +} // SetupPodWebhookWithManager registers the webhook for Pod in the manager. func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider) error { @@ -386,7 +394,7 @@ func (m *TensorFusionPodMutator) assignDeviceAllocationIndex(ctx context.Context var indexErr error podIdentifier := pod.Name if podIdentifier == "" { - // For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID + // For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID(maybe empty) podIdentifier = pod.GenerateName + string(pod.UID) } @@ -576,7 +584,7 @@ func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod return 0, fmt.Errorf("operator leader IP not found") } - urlStr := fmt.Sprintf("http://%s:8080/assign-host-port?podName=%s", leaderIP, pod.Name) + urlStr := fmt.Sprintf("http://%s:%s/api/assign-host-port?podName=%s", leaderIP, operatorPort, pod.Name) req, err := http.NewRequest("GET", urlStr, nil) if err != nil { return 0, err @@ -613,7 +621,7 @@ func (m *TensorFusionPodMutator) assignIndexFromLeader(ctx context.Context, pod if podIdentifier == "" { podIdentifier = pod.GenerateName + string(pod.UID) } - urlStr := fmt.Sprintf("http://%s:8080/assign-index?podName=%s", leaderIP, podIdentifier) + urlStr := fmt.Sprintf("http://%s:%s/api/assign-index?podName=%s", leaderIP, operatorPort, podIdentifier) req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil) if err != nil { return 0, err