Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clustermesh-apiserver: rework services synchronization to improve performance #25260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Documentation/observability/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ Name Labels
``kvstore_operations_duration_seconds`` ``action``, ``kind``, ``outcome``, ``scope`` Enabled Duration of kvstore operation
``kvstore_events_queue_seconds`` ``action``, ``scope`` Enabled Duration of seconds of time received event was blocked before it could be queued
``kvstore_quorum_errors_total`` ``error`` Enabled Number of quorum errors
``kvstore_sync_queue_size`` ``scope``, ``source_cluster`` Enabled Number of elements queued for synchronization in the kvstore
======================================== ============================================ ========== ========================================================

Agent
Expand Down
37 changes: 15 additions & 22 deletions operator/watchers/k8s_service_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package watchers
import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -24,6 +23,7 @@ import (
slimclientset "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/lock"
Expand All @@ -38,7 +38,7 @@ var (
// k8sSvcCacheSynced is used do signalize when all services are synced with
// k8s.
k8sSvcCacheSynced = make(chan struct{})
kvs *store.SharedStore
kvs store.SyncStore
)

func k8sEventMetric(scope, action string) {
Expand All @@ -53,27 +53,32 @@ func k8sServiceHandler(ctx context.Context, clusterName string, shared bool, clu
svc.Cluster = clusterName
svc.ClusterID = clusterID

log.WithFields(logrus.Fields{
scopedLog := log.WithFields(logrus.Fields{
logfields.K8sSvcName: event.ID.Name,
logfields.K8sNamespace: event.ID.Namespace,
"action": event.Action.String(),
"service": event.Service.String(),
"endpoints": event.Endpoints.String(),
"shared": event.Service.Shared,
}).Debug("Kubernetes service definition changed")
})
scopedLog.Debug("Kubernetes service definition changed")

if shared && !event.Service.Shared {
// The annotation may have been added, delete an eventual existing service
kvs.DeleteLocalKey(ctx, &svc)
kvs.DeleteKey(ctx, &svc)
return
}

switch event.Action {
case k8s.UpdateService:
kvs.UpdateLocalKeySync(ctx, &svc)
if err := kvs.UpsertKey(ctx, &svc); err != nil {
// An error is triggered only in case it concerns service marshaling,
// as kvstore operations are automatically re-tried in case of error.
scopedLog.WithError(err).Warning("Failed synchronizing service")
}

case k8s.DeleteService:
kvs.DeleteLocalKey(ctx, &svc)
kvs.DeleteKey(ctx, &svc)
}
}
for {
Expand Down Expand Up @@ -117,23 +122,11 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients
readyChan := make(chan struct{}, 0)

go func() {
store, err := store.JoinSharedStore(store.Configuration{
Prefix: serviceStore.ServiceStorePrefix,
SynchronizationInterval: 5 * time.Minute,
KeyCreator: func() store.Key {
return &serviceStore.ClusterService{}
},
Backend: nil,
Observer: nil,
Context: ctx,
})

if err != nil {
log.WithError(err).Fatal("Unable to join kvstore store to announce services")
}

store := store.NewWorkqueueSyncStore(kvstore.Client(), serviceStore.ServiceStorePrefix,
store.WSSWithSourceClusterName(cfg.LocalClusterName()))
kvs = store
close(readyChan)
store.Run(ctx)
}()

swgSvcs := lock.NewStoppableWaitGroup()
Expand Down
8 changes: 4 additions & 4 deletions pkg/kvstore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,18 @@ type BackendOperations interface {
// Set sets value of key
Set(ctx context.Context, key string, value []byte) error

// Delete deletes a key
// Delete deletes a key. It does not return an error if the key does not exist.
Delete(ctx context.Context, key string) error

// DeleteIfLocked deletes a key if the client is still holding the given lock.
// DeleteIfLocked deletes a key if the client is still holding the given lock. It does not return an error if the key does not exist.
DeleteIfLocked(ctx context.Context, key string, lock KVLocker) error

DeletePrefix(ctx context.Context, path string) error

// Update atomically creates a key or fails if it already exists
// Update creates or updates a key.
Update(ctx context.Context, key string, value []byte, lease bool) error

// UpdateIfLocked atomically creates a key or fails if it already exists if the client is still holding the given lock.
// UpdateIfLocked updates a key if the client is still holding the given lock.
UpdateIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) error

// UpdateIfDifferent updates a key if the value is different
Expand Down
2 changes: 1 addition & 1 deletion pkg/kvstore/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (c *consulClient) getPrefix(ctx context.Context, prefix string) (k string,
return pairs[0].Key, pairs[0].Value, nil
}

// UpdateIfLocked atomically creates a key or fails if it already exists if the client is still holding the given lock.
// UpdateIfLocked updates a key if the client is still holding the given lock.
func (c *consulClient) UpdateIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) error {
return c.Update(ctx, key, value, lease)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kvstore/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,7 @@ func (e *etcdClient) createOpPut(key string, value []byte, leaseID client.LeaseI
return &op
}

// UpdateIfLocked atomically creates a key or fails if it already exists if the client is still holding the given lock.
// UpdateIfLocked updates a key if the client is still holding the given lock.
func (e *etcdClient) UpdateIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) error {
if err := e.waitForInitialSession(ctx); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion pkg/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func (s *independentSuite) TestValidateScopesFromKey(c *C) {
"cilium/state/ip/v1/default/10.15.189.183": "ip/v1",
"cilium/state/ip/v1/default/f00d::a0f:0:0:6f2e": "ip/v1",
"cilium/state/nodes/v1/default/runtime": "nodes/v1",
"cilium/state/nodes/v1": "nodes/v1",
}

for key, val := range mockData {
c.Assert(getScopeFromKey(key), Equals, val)
c.Assert(GetScopeFromKey(key), Equals, val)
}
}
8 changes: 4 additions & 4 deletions pkg/kvstore/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ const (
metricSet = "set"
)

func getScopeFromKey(key string) string {
func GetScopeFromKey(key string) string {
s := strings.SplitN(key, "/", 5)
if len(s) != 5 {
if len(s) < 4 {
marseel marked this conversation as resolved.
Show resolved Hide resolved
if len(key) >= 12 {
return key[:12]
}
Expand All @@ -33,7 +33,7 @@ func increaseMetric(key, kind, action string, duration time.Duration, err error)
if !option.Config.MetricsConfig.KVStoreOperationsDurationEnabled {
return
}
namespace := getScopeFromKey(key)
namespace := GetScopeFromKey(key)
outcome := metrics.Error2Outcome(err)
metrics.KVStoreOperationsDuration.
WithLabelValues(namespace, kind, action, outcome).Observe(duration.Seconds())
Expand All @@ -43,7 +43,7 @@ func trackEventQueued(key string, typ EventType, duration time.Duration) {
if !option.Config.MetricsConfig.KVStoreEventsQueueDurationEnabled {
return
}
metrics.KVStoreEventsQueueDuration.WithLabelValues(getScopeFromKey(key), typ.String()).Observe(duration.Seconds())
metrics.KVStoreEventsQueueDuration.WithLabelValues(GetScopeFromKey(key), typ.String()).Observe(duration.Seconds())
}

func recordQuorumError(err string) {
Expand Down