Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Consul Service Instance Localities from K8s Node Labels #2346

Merged
merged 1 commit into from Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/2346.txt
@@ -0,0 +1,3 @@
```release-note:feature
Set locality on services registered with connect-inject.
```
Expand Up @@ -318,6 +318,20 @@ func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod c
return nil
}

func parseLocality(node corev1.Node) *api.Locality {
region := node.Labels[corev1.LabelTopologyRegion]
zone := node.Labels[corev1.LabelTopologyZone]

if region == "" {
return nil
}

return &api.Locality{
Region: region,
Zone: zone,
}
}

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
Expand Down Expand Up @@ -405,6 +419,11 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints
}
}

var node corev1.Node
// Ignore errors because we don't want failures to block running services.
_ = r.Client.Get(context.Background(), types.NamespacedName{Name: pod.Spec.NodeName, Namespace: pod.Namespace}, &node)
locality := parseLocality(node)

// We only want that annotation to be present when explicitly overriding the consul svc name
// Otherwise, the Consul service name should equal the Kubernetes Service name.
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod
Expand Down Expand Up @@ -440,6 +459,7 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints
Meta: meta,
Namespace: consulNS,
Tags: tags,
Locality: locality,
}
serviceRegistration := &api.CatalogRegistration{
Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName),
Expand Down
Expand Up @@ -1938,6 +1938,17 @@ func TestReconcileCreateEndpoint(t *testing.T) {
pod1.Annotations[constants.AnnotationUpstreams] = "upstream1:1234"
pod1.Annotations[constants.AnnotationEnableMetrics] = "true"
pod1.Annotations[constants.AnnotationPrometheusScrapePort] = "12345"
pod1.Spec.NodeName = "my-node"
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "my-node",
Namespace: "default",
Labels: map[string]string{
corev1.LabelTopologyRegion: "us-west-1",
corev1.LabelTopologyZone: "us-west-1a",
},
},
}
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Expand All @@ -1958,7 +1969,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
},
}
return []runtime.Object{pod1, endpoint}
return []runtime.Object{pod1, node, endpoint}
},
expectedConsulSvcInstances: []*api.CatalogService{
{
Expand All @@ -1978,6 +1989,10 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
ServiceTags: []string{"abc,123", "pod1"},
ServiceProxy: &api.AgentServiceConnectProxyConfig{},
ServiceLocality: &api.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
expectedProxySvcInstances: []*api.CatalogService{
Expand Down Expand Up @@ -2190,6 +2205,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceLocality, instance.ServiceLocality)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTaggedAddresses, instance.ServiceTaggedAddresses)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceProxy, instance.ServiceProxy)
if tt.nodeMeta != nil {
Expand Down Expand Up @@ -2236,6 +2252,36 @@ func TestReconcileCreateEndpoint(t *testing.T) {
}
}

func TestParseLocality(t *testing.T) {
t.Run("no labels", func(t *testing.T) {
n := corev1.Node{}
require.Nil(t, parseLocality(n))
})

t.Run("zone only", func(t *testing.T) {
n := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
corev1.LabelTopologyZone: "us-west-1a",
},
},
}
require.Nil(t, parseLocality(n))
})

t.Run("everything", func(t *testing.T) {
n := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
corev1.LabelTopologyRegion: "us-west-1",
corev1.LabelTopologyZone: "us-west-1a",
},
},
}
require.Equal(t, &api.Locality{Region: "us-west-1", Zone: "us-west-1a"}, parseLocality(n))
})
}

// Tests updating an Endpoints object.
// - Tests updates via the register codepath:
// - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated.
Expand Down