Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"Hygon",
"iface",
"imageutils",
"indexallocator",
"influxdata",
"internalcache",
"internalqueue",
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func startHttpServerForTFClient(
connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, nodeScalerInfoRouter, leaderChan,
)
go func() {
// Port defaults to 8080, set with env var `PORT`
err := httpServer.Run()
if err != nil {
setupLog.Error(err, "problem running HTTP server")
Expand Down
4 changes: 2 additions & 2 deletions internal/indexallocator/indexallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manag
// Index wraps around from 512 to 1 (simple modulo operation)
func (s *IndexAllocator) AssignIndex(podName string) (int, error) {
if !s.IsLeader {
log.FromContext(s.ctx).Error(nil, "only leader can assign index", "podName", podName)
return 0, fmt.Errorf("only leader can assign index")
}

// Atomic increment and wrap around
next := atomic.AddInt64(&s.currentIndex, 1)
index := int((next-1)%IndexRangeEnd) + IndexRangeStart

log.FromContext(s.ctx).Info("assigned index successfully", "podName", podName, "index", index)
return index, nil
}
6 changes: 2 additions & 4 deletions internal/server/router/assign_host_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"

"github.com/NexusGPU/tensor-fusion/internal/constants"
"github.com/NexusGPU/tensor-fusion/internal/portallocator"
"github.com/NexusGPU/tensor-fusion/internal/utils"
"github.com/gin-gonic/gin"
Expand All @@ -26,9 +25,8 @@ func NewAssignHostPortRouter(ctx context.Context, allocator *portallocator.PortA

func (r *AssignHostPortRouter) AssignHostPort(ctx *gin.Context) {
podName := ctx.Query("podName")
token := ctx.Request.Header.Get(constants.AuthorizationHeader)

if token == "" {
token, ok := utils.ExtractBearerToken(ctx)
if !ok {
log.FromContext(ctx).Error(nil, "assigned host port failed, missing token", "podName", podName)
ctx.String(http.StatusUnauthorized, "missing authorization header")
return
Expand Down
9 changes: 4 additions & 5 deletions internal/server/router/assign_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"

"github.com/NexusGPU/tensor-fusion/internal/constants"
"github.com/NexusGPU/tensor-fusion/internal/indexallocator"
"github.com/NexusGPU/tensor-fusion/internal/utils"
"github.com/gin-gonic/gin"
Expand All @@ -26,9 +25,8 @@ func NewAssignIndexRouter(ctx context.Context, allocator *indexallocator.IndexAl

func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) {
podName := ctx.Query("podName")
token := ctx.Request.Header.Get(constants.AuthorizationHeader)

if token == "" {
token, ok := utils.ExtractBearerToken(ctx)
if !ok {
log.FromContext(ctx).Error(nil, "assigned index failed, missing token", "podName", podName)
ctx.String(http.StatusUnauthorized, "missing authorization header")
return
Expand All @@ -47,7 +45,8 @@ func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) {
return
}
if !tokenReview.Status.Authenticated || tokenReview.Status.User.Username != utils.GetSelfServiceAccountNameFull() {
log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName)
log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName,
"authPassed", tokenReview.Status.Authenticated, "username", tokenReview.Status.User.Username)
ctx.String(http.StatusUnauthorized, "token authentication failed")
return
}
Expand Down
5 changes: 5 additions & 0 deletions internal/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (
var selfServiceAccountName string

func InitServiceAccountConfig() {
if os.Getenv("IMPERSONATE_SERVICE_ACCOUNT") != "" {
selfServiceAccountName = os.Getenv("IMPERSONATE_SERVICE_ACCOUNT")
ctrl.Log.Info("impersonate service account mode detected", "name", selfServiceAccountName)
return
}
data, err := os.ReadFile(ServiceAccountTokenPath)
if err != nil {
ctrl.Log.Info("service account token not found, run outside of Kubernetes cluster")
Expand Down
26 changes: 26 additions & 0 deletions internal/utils/svr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils

import (
"strings"

"github.com/NexusGPU/tensor-fusion/internal/constants"
"github.com/gin-gonic/gin"
)

const BearerPrefix = "Bearer "

// ExtractBearerToken extracts the authorization token from the gin context.
// It handles both cases: token with "Bearer " prefix and token without prefix.
// Returns the token string (with Bearer prefix stripped if present) and true if token exists.
// Returns empty string and false if token is missing.
func ExtractBearerToken(ctx *gin.Context) (string, bool) {
token := ctx.Request.Header.Get(constants.AuthorizationHeader)
if token == "" {
return "", false
}

// Strip Bearer prefix if present
token = strings.TrimPrefix(token, BearerPrefix)

return token, true
}
14 changes: 11 additions & 3 deletions internal/webhook/v1/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"time"

Expand All @@ -46,6 +47,13 @@ import (
)

var httpClient = &http.Client{Timeout: 10 * time.Second}
var operatorPort = "8080"

func init() {
if port := os.Getenv("PORT"); port != "" {
operatorPort = port
}
}

// SetupPodWebhookWithManager registers the webhook for Pod in the manager.
func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider) error {
Expand Down Expand Up @@ -386,7 +394,7 @@ func (m *TensorFusionPodMutator) assignDeviceAllocationIndex(ctx context.Context
var indexErr error
podIdentifier := pod.Name
if podIdentifier == "" {
// For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID
// For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID(maybe empty)
podIdentifier = pod.GenerateName + string(pod.UID)
}

Expand Down Expand Up @@ -576,7 +584,7 @@ func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod
return 0, fmt.Errorf("operator leader IP not found")
}

urlStr := fmt.Sprintf("http://%s:8080/assign-host-port?podName=%s", leaderIP, pod.Name)
urlStr := fmt.Sprintf("http://%s:%s/api/assign-host-port?podName=%s", leaderIP, operatorPort, pod.Name)
req, err := http.NewRequest("GET", urlStr, nil)
if err != nil {
return 0, err
Expand Down Expand Up @@ -613,7 +621,7 @@ func (m *TensorFusionPodMutator) assignIndexFromLeader(ctx context.Context, pod
if podIdentifier == "" {
podIdentifier = pod.GenerateName + string(pod.UID)
}
urlStr := fmt.Sprintf("http://%s:8080/assign-index?podName=%s", leaderIP, podIdentifier)
urlStr := fmt.Sprintf("http://%s:%s/api/assign-index?podName=%s", leaderIP, operatorPort, podIdentifier)
req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil)
if err != nil {
return 0, err
Expand Down