Skip to content

Commit

Permalink
kvstore: Propagate ClusterID with Service
Browse files Browse the repository at this point in the history
This is a part of the Cluster Mesh with overlapping PodCIDR support.
Remote cluster puts ClusterID information to the service. So that the
cluster with cluster-aware addressing enabled can annotate service
backend IP with ClusterID.

Signed-off-by: Yutaro Hayakawa <yutaro.hayakawa@isovalent.com>
  • Loading branch information
YutaroHayakawa authored and pchaigno committed Feb 21, 2023
1 parent f3599ce commit 3b05104
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
18 changes: 11 additions & 7 deletions clustermesh-apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,18 @@ import (

type configuration struct {
clusterName string
clusterID uint32
serviceProxyName string
}

func (c configuration) LocalClusterName() string {
return c.clusterName
}

func (c configuration) LocalClusterID() uint32 {
return c.clusterID
}

func (c configuration) K8sServiceProxyNameValue() string {
return c.serviceProxyName
}
Expand Down Expand Up @@ -95,9 +100,8 @@ var (
},
}

mockFile string
clusterID uint32
cfg configuration
mockFile string
cfg configuration

ciliumNodeStore *store.SharedStore

Expand Down Expand Up @@ -206,7 +210,7 @@ func runApiserver() error {
flags.String(option.IdentityAllocationMode, option.IdentityAllocationModeCRD, "Method to use for identity allocation")
option.BindEnv(vp, option.IdentityAllocationMode)

flags.Uint32Var(&clusterID, option.ClusterIDName, 0, "Cluster ID")
flags.Uint32Var(&cfg.clusterID, option.ClusterIDName, 0, "Cluster ID")
option.BindEnv(vp, option.ClusterIDName)

flags.StringVar(&cfg.clusterName, option.ClusterName, "default", "Cluster name")
Expand Down Expand Up @@ -364,7 +368,7 @@ func updateNode(obj interface{}) {
if ciliumNode, ok := obj.(*ciliumv2.CiliumNode); ok {
n := nodeTypes.ParseCiliumNode(ciliumNode)
n.Cluster = cfg.clusterName
n.ClusterID = clusterID
n.ClusterID = cfg.clusterID
if err := ciliumNodeStore.UpdateLocalKeySync(context.Background(), &n); err != nil {
log.WithError(err).Warning("Unable to insert node into etcd")
} else {
Expand Down Expand Up @@ -558,7 +562,7 @@ func synchronizeCiliumEndpoints(clientset k8sClient.Clientset) {
func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, services resource.Resource[*slim_corev1.Service]) {
log.WithFields(logrus.Fields{
"cluster-name": cfg.clusterName,
"cluster-id": clusterID,
"cluster-id": cfg.clusterID,
}).Info("Starting clustermesh-apiserver...")

if mockFile == "" {
Expand All @@ -573,7 +577,7 @@ func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, servi
}

config := cmtypes.CiliumClusterConfig{
ID: clusterID,
ID: cfg.clusterID,
}

if err := clustermesh.SetClusterConfig(cfg.clusterName, &config, kvstore.Client()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion clustermesh-apiserver/vmmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func nodeOverrideFromCEW(n *nodeTypes.RegisterNode, cew *ciliumv2.CiliumExternal

// Override cluster
nk.Cluster = cfg.clusterName
nk.ClusterID = clusterID
nk.ClusterID = cfg.clusterID
nk.Labels[k8sConst.PolicyLabelCluster] = cfg.clusterName

// Override CIDRs if defined
Expand Down
8 changes: 6 additions & 2 deletions operator/watchers/k8s_service_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ func k8sEventMetric(scope, action string) {
metrics.EventTS.WithLabelValues(metrics.LabelEventSourceK8s, scope, action)
}

func k8sServiceHandler(clusterName string, shared bool) {
func k8sServiceHandler(clusterName string, shared bool, clusterID uint32) {
serviceHandler := func(event k8s.ServiceEvent) {
defer event.SWG.Done()

svc := k8s.NewClusterService(event.ID, event.Service, event.Endpoints)
svc.Cluster = clusterName
svc.ClusterID = clusterID

log.WithFields(logrus.Fields{
logfields.K8sSvcName: event.ID.Name,
Expand Down Expand Up @@ -90,6 +91,9 @@ type ServiceSyncConfiguration interface {
// LocalClusterName must return the local cluster name
LocalClusterName() string

// LocalClusterID must return the local cluster id
LocalClusterID() uint32

utils.ServiceConfiguration
}

Expand Down Expand Up @@ -188,7 +192,7 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients

<-readyChan
log.Info("Starting to synchronize Kubernetes services to kvstore")
k8sServiceHandler(cfg.LocalClusterName(), shared)
k8sServiceHandler(cfg.LocalClusterName(), shared, cfg.LocalClusterID())
}()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/service/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type ClusterService struct {

// Shared is true when the service should be exposed/shared to other clusters
Shared bool `json:"shared"`

// ClusterID is the cluster ID the service is configured in
ClusterID uint32 `json:"clusterID"`
}

func (s *ClusterService) String() string {
Expand Down

0 comments on commit 3b05104

Please sign in to comment.