diff --git a/.changelog/2346.txt b/.changelog/2346.txt new file mode 100644 index 0000000000..fb062ee0fb --- /dev/null +++ b/.changelog/2346.txt @@ -0,0 +1,3 @@ +```release-note:feature +Set locality on services registered with connect-inject. +``` diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index eeaeeab485..740cf276dd 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -318,6 +318,20 @@ func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod c return nil } +func parseLocality(node corev1.Node) *api.Locality { + region := node.Labels[corev1.LabelTopologyRegion] + zone := node.Labels[corev1.LabelTopologyZone] + + if region == "" { + return nil + } + + return &api.Locality{ + Region: region, + Zone: zone, + } +} + // registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { @@ -405,6 +419,11 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints } } + var node corev1.Node + // Ignore errors because we don't want failures to block running services. + r.Client.Get(context.Background(), types.NamespacedName{Name: pod.Spec.NodeName, Namespace: pod.Namespace}, &node) + locality := parseLocality(node) + // We only want that annotation to be present when explicitly overriding the consul svc name // Otherwise, the Consul service name should equal the Kubernetes Service name. // The service name in Consul defaults to the Endpoints object name, and is overridden by the pod @@ -440,6 +459,7 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints Meta: meta, Namespace: consulNS, Tags: tags, + Locality: locality, } serviceRegistration := &api.CatalogRegistration{ Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName), diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go index acf62b2b0e..ea1ce686d6 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go @@ -1938,6 +1938,17 @@ func TestReconcileCreateEndpoint(t *testing.T) { pod1.Annotations[constants.AnnotationUpstreams] = "upstream1:1234" pod1.Annotations[constants.AnnotationEnableMetrics] = "true" pod1.Annotations[constants.AnnotationPrometheusScrapePort] = "12345" + pod1.Spec.NodeName = "my-node" + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-node", + Namespace: "default", + Labels: map[string]string{ + corev1.LabelTopologyRegion: "us-west-1", + corev1.LabelTopologyZone: "us-west-1a", + }, + }, + } endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-created", @@ -1958,7 +1969,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { }, }, } - return []runtime.Object{pod1, endpoint} + return []runtime.Object{pod1, node, endpoint} }, expectedConsulSvcInstances: []*api.CatalogService{ { @@ -1978,6 +1989,10 @@ func TestReconcileCreateEndpoint(t *testing.T) { }, ServiceTags: []string{"abc,123", "pod1"}, ServiceProxy: &api.AgentServiceConnectProxyConfig{}, + ServiceLocality: &api.Locality{ + Region: "us-west-1", + Zone: "us-west-1a", + }, }, }, expectedProxySvcInstances: []*api.CatalogService{ @@ -2190,6 +2205,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { require.Equal(t, tt.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort) require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta) require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceLocality, instance.ServiceLocality) require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTaggedAddresses, instance.ServiceTaggedAddresses) require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceProxy, instance.ServiceProxy) if tt.nodeMeta != nil { @@ -2236,6 +2252,36 @@ func TestReconcileCreateEndpoint(t *testing.T) { } } +func TestParseLocality(t *testing.T) { + t.Run("no labels", func(t *testing.T) { + n := corev1.Node{} + require.Nil(t, parseLocality(n)) + }) + + t.Run("zone only", func(t *testing.T) { + n := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + corev1.LabelTopologyZone: "us-west-1a", + }, + }, + } + require.Nil(t, parseLocality(n)) + }) + + t.Run("everything", func(t *testing.T) { + n := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + corev1.LabelTopologyRegion: "us-west-1", + corev1.LabelTopologyZone: "us-west-1a", + }, + }, + } + require.Equal(t, &api.Locality{Region: "us-west-1", Zone: "us-west-1a"}, parseLocality(n)) + }) +} + // Tests updating an Endpoints object. // - Tests updates via the register codepath: // - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated.