Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClusterMesh/KVStoreMesh Readiness and Bootstrap QPS #30361

Merged
merged 12 commits into from
Mar 6, 2024
12 changes: 12 additions & 0 deletions Documentation/helm-values.rst

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions Documentation/observability/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,15 @@ Exported Metrics
All metrics are exported under the ``cilium_clustermesh_apiserver_``
Prometheus namespace.

Bootstrap
~~~~~~~~~

======================================== ============================================ ========================================================
Name Labels Description
======================================== ============================================ ========================================================
``bootstrap_seconds`` ``source_cluster`` Duration in seconds to complete bootstrap
======================================== ============================================ ========================================================

KVstore
~~~~~~~

Expand Down Expand Up @@ -1166,6 +1175,15 @@ Exported Metrics

All metrics are exported under the ``cilium_kvstoremesh_`` Prometheus namespace.

Bootstrap
~~~~~~~~~

======================================== ============================================ ========================================================
Name Labels Description
======================================== ============================================ ========================================================
``bootstrap_seconds`` ``source_cluster`` Duration in seconds to complete bootstrap
======================================== ============================================ ========================================================

Remote clusters
~~~~~~~~~~~~~~~

Expand Down
11 changes: 9 additions & 2 deletions clustermesh-apiserver/clustermesh/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package clustermesh

import (
cmk8s "github.com/cilium/cilium/clustermesh-apiserver/clustermesh/k8s"
"github.com/cilium/cilium/clustermesh-apiserver/health"
cmmetrics "github.com/cilium/cilium/clustermesh-apiserver/metrics"
"github.com/cilium/cilium/clustermesh-apiserver/option"
"github.com/cilium/cilium/clustermesh-apiserver/syncstate"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/defaults"
Expand Down Expand Up @@ -45,11 +47,16 @@ var Cell = cell.Module(
cmk8s.ResourcesCell,

kvstore.Cell(kvstore.EtcdBackendName),
cell.Provide(func() *kvstore.ExtraOptions { return nil }),
cell.Provide(func(ss syncstate.SyncState) *kvstore.ExtraOptions {
return &kvstore.ExtraOptions{
BootstrapComplete: ss.WaitChannel(),
}
}),
store.Cell,

heartbeat.Cell,
healthAPIServerCell,
HealthAPIEndpointsCell,
health.HealthAPIServerCell,

cmmetrics.Cell,

Expand Down
94 changes: 31 additions & 63 deletions clustermesh-apiserver/clustermesh/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,49 @@
package clustermesh

import (
"errors"
"fmt"
"net/http"

"github.com/spf13/pflag"
"github.com/sirupsen/logrus"

"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/clustermesh-apiserver/health"
"github.com/cilium/cilium/clustermesh-apiserver/syncstate"
"github.com/cilium/cilium/pkg/hive/cell"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/option"
)

type HealthAPIServerConfig struct {
ClusterMeshHealthPort int
}

func (HealthAPIServerConfig) Flags(flags *pflag.FlagSet) {
flags.Int(option.ClusterMeshHealthPort, defaults.ClusterMeshHealthPort, "TCP port for ClusterMesh apiserver health API")
}
var HealthAPIEndpointsCell = cell.Module(
"health-api-endpoints",
"ClusterMesh Health API Endpoints",

var healthAPIServerCell = cell.Module(
"health-api-server",
"ClusterMesh Health API Server",

cell.Config(HealthAPIServerConfig{}),
cell.Provide(NewSyncState),
cell.Invoke(registerHealthAPIServer),
syncstate.Cell,
cell.Provide(healthEndpoints),
)

func registerHealthAPIServer(lc cell.Lifecycle, clientset k8sClient.Clientset, cfg HealthAPIServerConfig, syncState *SyncState) {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusOK
reply := "ok"

if _, err := clientset.Discovery().ServerVersion(); err != nil {
statusCode = http.StatusInternalServerError
reply = err.Error()
}
w.WriteHeader(statusCode)
if _, err := w.Write([]byte(reply)); err != nil {
log.WithError(err).Error("Failed to respond to /healthz request")
}
})
type healthParameters struct {
cell.In

mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusInternalServerError
reply := "NotReady"

if syncState.Complete() {
statusCode = http.StatusOK
reply = "Ready"
}
w.WriteHeader(statusCode)
if _, err := w.Write([]byte(reply)); err != nil {
log.WithError(err).Error("Failed to respond to /readyz request")
}
})

srv := &http.Server{
Handler: mux,
Addr: fmt.Sprintf(":%d", cfg.ClusterMeshHealthPort),
}
Clientset k8sClient.Clientset
SyncState syncstate.SyncState
Logger logrus.FieldLogger
}

lc.Append(cell.Hook{
OnStart: func(cell.HookContext) error {
go func() {
log.Info("Started health API")
if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.WithError(err).Fatal("Unable to start health API")
func healthEndpoints(params healthParameters) []health.EndpointFunc {
return []health.EndpointFunc{
{
Path: "/readyz",
HandlerFunc: func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusInternalServerError
reply := "NotReady"

if params.SyncState.Complete() {
statusCode = http.StatusOK
reply = "Ready"
}
w.WriteHeader(statusCode)
if _, err := w.Write([]byte(reply)); err != nil {
params.Logger.WithError(err).Error("Failed to respond to /readyz request")
}
}()
return nil
},
},
OnStop: func(ctx cell.HookContext) error { return srv.Shutdown(ctx) },
})
}
}
35 changes: 3 additions & 32 deletions clustermesh-apiserver/clustermesh/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"

cmk8s "github.com/cilium/cilium/clustermesh-apiserver/clustermesh/k8s"
"github.com/cilium/cilium/clustermesh-apiserver/syncstate"
operatorWatchers "github.com/cilium/cilium/operator/watchers"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
cmutils "github.com/cilium/cilium/pkg/clustermesh/utils"
Expand All @@ -31,7 +32,6 @@ import (
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
Expand Down Expand Up @@ -79,7 +79,7 @@ type parameters struct {
Resources cmk8s.Resources
BackendPromise promise.Promise[kvstore.BackendOperations]
StoreFactory store.Factory
SyncState *SyncState
thorn3r marked this conversation as resolved.
Show resolved Hide resolved
SyncState syncstate.SyncState
}

func registerHooks(lc cell.Lifecycle, params parameters) error {
Expand All @@ -101,35 +101,6 @@ func registerHooks(lc cell.Lifecycle, params parameters) error {
return nil
}

func NewSyncState() *SyncState {
return &SyncState{StoppableWaitGroup: *lock.NewStoppableWaitGroup()}
}

// SyncState is a wrapper around lock.StoppableWaitGroup used to keep track of the synchronization
// of various resources to the kvstore.
type SyncState struct {
lock.StoppableWaitGroup
}

// Complete returns true if all resources have been synchronized to the kvstore.
func (ss *SyncState) Complete() bool {
select {
case <-ss.WaitChannel():
return true
default:
return false
}
}

// WaitForResource adds a resource to the SyncState and returns a callback function that should be
// called when the resource has been synchronized.
func (ss *SyncState) WaitForResource() func(context.Context) {
ss.Add()
return func(_ context.Context) {
ss.Done()
}
}

type identitySynchronizer struct {
store store.SyncStore
encoder func([]byte) string
Expand Down Expand Up @@ -377,7 +348,7 @@ func startServer(
backend kvstore.BackendOperations,
resources cmk8s.Resources,
factory store.Factory,
syncState *SyncState,
syncState syncstate.SyncState,
) {
log.WithFields(logrus.Fields{
"cluster-name": cinfo.Name,
Expand Down
74 changes: 74 additions & 0 deletions clustermesh-apiserver/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package health

import (
"errors"
"fmt"
"net/http"

"github.com/sirupsen/logrus"
"github.com/spf13/pflag"

"github.com/cilium/cilium/pkg/hive/cell"
)

type HealthAPIServerConfig struct {
HealthPort int
}

func (def HealthAPIServerConfig) Flags(flags *pflag.FlagSet) {
flags.Int("health-port", def.HealthPort, "TCP port for ClusterMesh health API")
}

var DefaultHealthAPIServerConfig = HealthAPIServerConfig{
HealthPort: 9880,
}

var HealthAPIServerCell = cell.Module(
"health-api-server",
"ClusterMesh Health API Server",

cell.Config(DefaultHealthAPIServerConfig),
cell.Invoke(registerHealthAPIServer),
)

type parameters struct {
cell.In

Config HealthAPIServerConfig
Logger logrus.FieldLogger
EndpointFuncs []EndpointFunc
}

type EndpointFunc struct {
Path string
HandlerFunc http.HandlerFunc
}

func registerHealthAPIServer(lc cell.Lifecycle, params parameters) {
mux := http.NewServeMux()

for _, endpoint := range params.EndpointFuncs {
mux.HandleFunc(endpoint.Path, endpoint.HandlerFunc)
}

srv := &http.Server{
Handler: mux,
Addr: fmt.Sprintf(":%d", params.Config.HealthPort),
}

lc.Append(cell.Hook{
OnStart: func(cell.HookContext) error {
go func() {
params.Logger.Info("Started health API")
if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
params.Logger.WithError(err).Fatal("Unable to start health API")
}
}()
return nil
},
OnStop: func(ctx cell.HookContext) error { return srv.Shutdown(ctx) },
})
}
16 changes: 15 additions & 1 deletion clustermesh-apiserver/kvstoremesh/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package kvstoremesh

import (
"github.com/cilium/cilium/clustermesh-apiserver/health"
cmmetrics "github.com/cilium/cilium/clustermesh-apiserver/metrics"
"github.com/cilium/cilium/clustermesh-apiserver/option"
"github.com/cilium/cilium/clustermesh-apiserver/syncstate"
"github.com/cilium/cilium/pkg/clustermesh/kvstoremesh"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/gops"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/hive/job"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/pprof"
)
Expand All @@ -21,6 +24,7 @@ var Cell = cell.Module(
"Cilium KVStoreMesh",

cell.Config(option.DefaultLegacyKVStoreMeshConfig),
cell.Config(kvstoremesh.DefaultConfig),

cell.Config(cmtypes.DefaultClusterInfo),
cell.Invoke(registerClusterInfoValidator),
Expand All @@ -35,9 +39,19 @@ var Cell = cell.Module(
gops.Cell(defaults.GopsPortKVStoreMesh),
cmmetrics.Cell,

HealthAPIEndpointsCell,
health.HealthAPIServerCell,

kvstore.Cell(kvstore.EtcdBackendName),
cell.Provide(func() *kvstore.ExtraOptions { return nil }),
cell.Provide(func(ss syncstate.SyncState) *kvstore.ExtraOptions {
return &kvstore.ExtraOptions{
BootstrapComplete: ss.WaitChannel(),
}
}),
kvstoremesh.Cell,

job.Cell,
cell.Invoke(kvstoremesh.RegisterSyncWaiter),

cell.Invoke(func(*kvstoremesh.KVStoreMesh) {}),
)