Skip to content

Commit

Permalink
clustermesh: use SyncStore to synchronize services
Browse files Browse the repository at this point in the history
This commit updates the service synchronization logic to use the newly
introduced SyncStore implementation, to benefit from the underlying
workqueue to coalescence multiple updates concerning the same service
and automatically handle temporary failures concerning kvstore
operations.

Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 authored and borkmann committed May 16, 2023
1 parent 6de16c4 commit 087b502
Showing 1 changed file with 15 additions and 22 deletions.
37 changes: 15 additions & 22 deletions operator/watchers/k8s_service_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package watchers
import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -24,6 +23,7 @@ import (
slimclientset "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/lock"
Expand All @@ -38,7 +38,7 @@ var (
// k8sSvcCacheSynced is used do signalize when all services are synced with
// k8s.
k8sSvcCacheSynced = make(chan struct{})
kvs *store.SharedStore
kvs store.SyncStore
)

func k8sEventMetric(scope, action string) {
Expand All @@ -53,27 +53,32 @@ func k8sServiceHandler(ctx context.Context, clusterName string, shared bool, clu
svc.Cluster = clusterName
svc.ClusterID = clusterID

log.WithFields(logrus.Fields{
scopedLog := log.WithFields(logrus.Fields{
logfields.K8sSvcName: event.ID.Name,
logfields.K8sNamespace: event.ID.Namespace,
"action": event.Action.String(),
"service": event.Service.String(),
"endpoints": event.Endpoints.String(),
"shared": event.Service.Shared,
}).Debug("Kubernetes service definition changed")
})
scopedLog.Debug("Kubernetes service definition changed")

if shared && !event.Service.Shared {
// The annotation may have been added, delete an eventual existing service
kvs.DeleteLocalKey(ctx, &svc)
kvs.DeleteKey(ctx, &svc)
return
}

switch event.Action {
case k8s.UpdateService:
kvs.UpdateLocalKeySync(ctx, &svc)
if err := kvs.UpsertKey(ctx, &svc); err != nil {
// An error is triggered only in case it concerns service marshaling,
// as kvstore operations are automatically re-tried in case of error.
scopedLog.WithError(err).Warning("Failed synchronizing service")
}

case k8s.DeleteService:
kvs.DeleteLocalKey(ctx, &svc)
kvs.DeleteKey(ctx, &svc)
}
}
for {
Expand Down Expand Up @@ -117,23 +122,11 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients
readyChan := make(chan struct{}, 0)

go func() {
store, err := store.JoinSharedStore(store.Configuration{
Prefix: serviceStore.ServiceStorePrefix,
SynchronizationInterval: 5 * time.Minute,
KeyCreator: func() store.Key {
return &serviceStore.ClusterService{}
},
Backend: nil,
Observer: nil,
Context: ctx,
})

if err != nil {
log.WithError(err).Fatal("Unable to join kvstore store to announce services")
}

store := store.NewWorkqueueSyncStore(kvstore.Client(), serviceStore.ServiceStorePrefix,
store.WSSWithSourceClusterName(cfg.LocalClusterName()))
kvs = store
close(readyChan)
store.Run(ctx)
}()

swgSvcs := lock.NewStoppableWaitGroup()
Expand Down

0 comments on commit 087b502

Please sign in to comment.