Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion cmd/shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ import (
"path/filepath"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/shim/placement"
"github.com/cobaltcore-dev/cortex/pkg/conf"
"github.com/cobaltcore-dev/cortex/pkg/monitoring"
"github.com/cobaltcore-dev/cortex/pkg/multicluster"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"github.com/sapcc/go-bits/httpext"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -51,6 +54,7 @@ func main() {
restConfig := ctrl.GetConfigOrDie()

var metricsAddr string
var apiBindAddr string
var metricsCertPath, metricsCertName, metricsCertKey string
var webhookCertPath, webhookCertName, webhookCertKey string
// The shim does not require leader election, but this flag is provided to
Expand All @@ -59,9 +63,11 @@ func main() {
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var enablePlacementShim bool
var tlsOpts []func(*tls.Config)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
Comment thread
PhilippMatthes marked this conversation as resolved.
flag.StringVar(&apiBindAddr, "api-bind-address", ":8080", "The address the shim API server binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
Expand All @@ -77,6 +83,8 @@ func main() {
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.BoolVar(&enablePlacementShim, "placement-shim", false,
"If set, the placement API shim handlers are registered on the API server.")
Comment thread
PhilippMatthes marked this conversation as resolved.
opts := zap.Options{
Development: true,
}
Expand All @@ -90,6 +98,13 @@ func main() {
os.Exit(1)
}

// Check that the metrics and API bind addresses don't overlap.
if metricsAddr != "0" && metricsAddr == apiBindAddr {
err := errors.New("metrics-bind-address and api-bind-address must not be the same")
setupLog.Error(err, "invalid configuration", "metrics-bind-address", metricsAddr, "api-bind-address", apiBindAddr)
os.Exit(1)
}

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

// if the enable-http2 flag is false (the default), http/2 should be disabled
Expand Down Expand Up @@ -195,7 +210,26 @@ func main() {
os.Exit(1)
}

// TODO: Initialize multicluster client here.
homeCluster, err := cluster.New(restConfig, func(o *cluster.Options) { o.Scheme = scheme })
if err != nil {
setupLog.Error(err, "unable to create home cluster")
os.Exit(1)
}
if err := mgr.Add(homeCluster); err != nil {
setupLog.Error(err, "unable to add home cluster")
os.Exit(1)
}
multiclusterClient := &multicluster.Client{
HomeCluster: homeCluster,
HomeRestConfig: restConfig,
HomeScheme: scheme,
ResourceRouters: multicluster.DefaultResourceRouters,
}
multiclusterClientConfig := conf.GetConfigOrDie[multicluster.ClientConfig]()
if err := multiclusterClient.InitFromConf(ctx, mgr, multiclusterClientConfig); err != nil {
setupLog.Error(err, "unable to initialize multicluster client")
os.Exit(1)
}

// Our custom monitoring registry can add prometheus labels to all metrics.
// This is useful to distinguish metrics from different deployments.
Expand All @@ -204,6 +238,16 @@ func main() {

// API endpoint.
mux := http.NewServeMux()
var placementShim *placement.Shim
if enablePlacementShim {
placementShim = &placement.Shim{Client: multiclusterClient}
setupLog.Info("Adding placement shim to manager")
if err := placementShim.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to set up placement shim")
os.Exit(1)
}
placementShim.RegisterRoutes(mux)
}

// +kubebuilder:scaffold:builder

Expand Down
21 changes: 21 additions & 0 deletions helm/bundles/cortex-placement-shim/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,29 @@ alerts:

cortex-shim:
namePrefix: cortex-placement
deployment:
container:
extraArgs: ["--placement-shim=true"]
conf:
apiservers:
home:
gvks:
- kvm.cloud.sap/v1/Hypervisor
- kvm.cloud.sap/v1/HypervisorList
monitoring:
labels:
github_org: cobaltcore-dev
github_repo: cortex
# Uncomment and set the following values to enable SSO for the placement
# shim. The shim will use the provided SSO credentials to talk to openstack
# over ingress.
# sso:
# cert: |
# -----BEGIN CERTIFICATE-----
# Your certificate here
# -----END CERTIFICATE-----
# certKey: |
# -----BEGIN PRIVATE KEY-----
# Your private key here
# -----END PRIVATE KEY-----
# selfSigned: "false"
6 changes: 3 additions & 3 deletions helm/library/cortex-shim/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# This file is safe from kubebuilder edit --plugins=helm/v1-alpha
# If you want to re-generate, add the --force flag.

{{- if .Values.deployment.enable }}
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -32,6 +29,9 @@ spec:
{{- range .Values.deployment.container.args }}
- {{ . }}
{{- end }}
{{- range .Values.deployment.container.extraArgs }}
- {{ . }}
{{- end }}
ports:
- name: api
containerPort: 8080
Expand Down
5 changes: 2 additions & 3 deletions helm/library/cortex-shim/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ deployment:
image:
repository: ghcr.io/cobaltcore-dev/cortex-shim
args:
- "--api-bind-address=:8080"
- "--metrics-bind-address=:2112"
- "--health-probe-bind-address=:8081"
- "--metrics-secure=false"
extraArgs: []
resources:
limits:
cpu: 500m
Expand Down Expand Up @@ -53,9 +55,6 @@ rbac:
prometheus:
enable: true

global:
conf: {}

# Use this to unambiguate multiple cortex deployments in the same cluster.
namePrefix: cortex
conf: {} # No config for now that's needed by all the shims.
42 changes: 42 additions & 0 deletions internal/shim/placement/handle_allocation_candidates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package placement

import (
"net/http"

logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// HandleListAllocationCandidates handles GET /allocation_candidates requests.
//
// Returns a collection of allocation requests and resource provider summaries
// that can satisfy a given set of resource and trait requirements. This is the
// primary endpoint used by Nova's scheduler to find suitable hosts for
// instance placement.
//
// The resources query parameter specifies required capacity as a comma-
// separated list (e.g. VCPU:4,MEMORY_MB:2048,DISK_GB:64). The required
// parameter filters by traits, supporting forbidden traits via ! prefix
// (since 1.22) and the in: syntax for any-of semantics (since 1.39).
// The member_of parameter filters by aggregate membership with support for
// forbidden aggregates via ! prefix (since 1.32).
//
// Since microversion 1.25, granular request groups are supported via numbered
// suffixes (resourcesN, requiredN, member_ofN) to express requirements that
// may be satisfied by different providers. The group_policy parameter (1.26+)
// controls whether groups must each be satisfied by a single provider or may
// span multiple. The in_tree parameter (1.31+) constrains results to a
// specific provider tree.
//
// Each returned allocation request is directly usable as the body for
// PUT /allocations/{consumer_uuid}. The provider_summaries section includes
// inventory capacity and usage for informed decision-making. Available since
// microversion 1.10.
func (s *Shim) HandleListAllocationCandidates(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := logf.FromContext(ctx)
log.Info("placement request", "method", r.Method, "path", r.URL.Path)
s.forward(w, r)
}
21 changes: 21 additions & 0 deletions internal/shim/placement/handle_allocation_candidates_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package placement

import (
"net/http"
"testing"
)

func TestHandleListAllocationCandidates(t *testing.T) {
var gotPath string
s := newTestShim(t, http.StatusOK, `{"allocation_requests":[]}`, &gotPath)
w := serveHandler(t, "GET", "/allocation_candidates", s.HandleListAllocationCandidates, "/allocation_candidates")
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want %d", w.Code, http.StatusOK)
}
if gotPath != "/allocation_candidates" {
t.Fatalf("upstream path = %q, want /allocation_candidates", gotPath)
}
}
95 changes: 95 additions & 0 deletions internal/shim/placement/handle_allocations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package placement

import (
"net/http"

logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// HandleManageAllocations handles POST /allocations requests.
//
// Atomically creates, updates, or deletes allocations for multiple consumers
// in a single request. This is the primary mechanism for operations that must
// modify allocations across several consumers atomically, such as live
// migrations and move operations where resources are transferred from one
// consumer to another. Available since microversion 1.13.
//
// The request body is keyed by consumer UUID, each containing an allocations
// dictionary (keyed by resource provider UUID), along with project_id and
// user_id. Since microversion 1.28, consumer_generation enables consumer-
// level concurrency control. Since microversion 1.38, a consumer_type field
// (e.g. INSTANCE, MIGRATION) is supported. Returns 204 No Content on
// success, or 409 Conflict if inventory is insufficient or a concurrent
// update is detected (error code: placement.concurrent_update).
func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := logf.FromContext(ctx)
log.Info("placement request", "method", r.Method, "path", r.URL.Path)
s.forward(w, r)
}

// HandleListAllocations handles GET /allocations/{consumer_uuid} requests.
//
// Returns all allocation records for the consumer identified by
// {consumer_uuid}, across all resource providers. The response contains an
// allocations dictionary keyed by resource provider UUID. If the consumer has
// no allocations, an empty dictionary is returned.
//
// The response has grown across microversions: project_id and user_id were
// added at 1.12, consumer_generation at 1.28, and consumer_type at 1.38.
// The consumer_generation and consumer_type fields are absent when the
// consumer has no allocations.
func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) {
consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
if !ok {
return
}
ctx := r.Context()
log := logf.FromContext(ctx)
log.Info("placement request", "method", r.Method, "path", r.URL.Path,
"consumer_uuid", consumerUUID)
s.forward(w, r)
}

// HandleUpdateAllocations handles PUT /allocations/{consumer_uuid} requests.
//
// Creates or replaces all allocation records for a single consumer. If
// allocations already exist for this consumer, they are entirely replaced
// by the new set. The request format changed at microversion 1.12 from an
// array-based layout to an object keyed by resource provider UUID.
// Microversion 1.28 added consumer_generation for concurrency control,
// and 1.38 introduced consumer_type.
//
// Returns 204 No Content on success. Returns 409 Conflict if there is
// insufficient inventory or if a concurrent update was detected.
func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) {
consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
if !ok {
return
}
ctx := r.Context()
log := logf.FromContext(ctx)
log.Info("placement request", "method", r.Method, "path", r.URL.Path,
"consumer_uuid", consumerUUID)
s.forward(w, r)
}

// HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests.
//
// Removes all allocation records for the consumer across all resource
// providers. Returns 204 No Content on success, or 404 Not Found if the
// consumer has no existing allocations.
func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) {
consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
if !ok {
return
}
ctx := r.Context()
log := logf.FromContext(ctx)
log.Info("placement request", "method", r.Method, "path", r.URL.Path,
"consumer_uuid", consumerUUID)
s.forward(w, r)
}
Loading
Loading