Skip to content

Commit

Permalink
Set Consul service instance localities from K8s node labels
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn committed Jun 13, 2023
1 parent 60b214e commit db14384
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/2346.txt
@@ -0,0 +1,3 @@
```release-note:feature
Set locality on services registered with connect-inject.
```
Expand Up @@ -318,6 +318,20 @@ func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod c
return nil
}

func parseLocality(node corev1.Node) *api.Locality {
region := node.Labels[corev1.LabelTopologyRegion]
zone := node.Labels[corev1.LabelTopologyZone]

if region == "" {
return nil
}

return &api.Locality{
Region: region,
Zone: zone,
}
}

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
Expand Down Expand Up @@ -405,6 +419,11 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints
}
}

var node corev1.Node
// Ignore errors because we don't want failures to block running services.
r.Client.Get(context.Background(), types.NamespacedName{Name: pod.Spec.NodeName, Namespace: pod.Namespace}, &node)
locality := parseLocality(node)

// We only want that annotation to be present when explicitly overriding the consul svc name
// Otherwise, the Consul service name should equal the Kubernetes Service name.
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod
Expand Down Expand Up @@ -440,6 +459,7 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints
Meta: meta,
Namespace: consulNS,
Tags: tags,
Locality: locality,
}
serviceRegistration := &api.CatalogRegistration{
Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName),
Expand Down
Expand Up @@ -1938,6 +1938,17 @@ func TestReconcileCreateEndpoint(t *testing.T) {
pod1.Annotations[constants.AnnotationUpstreams] = "upstream1:1234"
pod1.Annotations[constants.AnnotationEnableMetrics] = "true"
pod1.Annotations[constants.AnnotationPrometheusScrapePort] = "12345"
pod1.Spec.NodeName = "my-node"
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "my-node",
Namespace: "default",
Labels: map[string]string{
corev1.LabelTopologyRegion: "us-west-1",
corev1.LabelTopologyZone: "us-west-1a",
},
},
}
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Expand All @@ -1958,7 +1969,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
},
}
return []runtime.Object{pod1, endpoint}
return []runtime.Object{pod1, node, endpoint}
},
expectedConsulSvcInstances: []*api.CatalogService{
{
Expand All @@ -1978,6 +1989,10 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
ServiceTags: []string{"abc,123", "pod1"},
ServiceProxy: &api.AgentServiceConnectProxyConfig{},
ServiceLocality: &api.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
expectedProxySvcInstances: []*api.CatalogService{
Expand Down Expand Up @@ -2190,6 +2205,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceLocality, instance.ServiceLocality)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceTaggedAddresses, instance.ServiceTaggedAddresses)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceProxy, instance.ServiceProxy)
if tt.nodeMeta != nil {
Expand Down Expand Up @@ -2236,6 +2252,36 @@ func TestReconcileCreateEndpoint(t *testing.T) {
}
}

func TestParseLocality(t *testing.T) {
t.Run("no labels", func(t *testing.T) {
n := corev1.Node{}
require.Nil(t, parseLocality(n))
})

t.Run("zone only", func(t *testing.T) {
n := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
corev1.LabelTopologyZone: "us-west-1a",
},
},
}
require.Nil(t, parseLocality(n))
})

t.Run("everything", func(t *testing.T) {
n := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
corev1.LabelTopologyRegion: "us-west-1",
corev1.LabelTopologyZone: "us-west-1a",
},
},
}
require.Equal(t, &api.Locality{Region: "us-west-1", Zone: "us-west-1a"}, parseLocality(n))
})
}

// Tests updating an Endpoints object.
// - Tests updates via the register codepath:
// - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated.
Expand Down

0 comments on commit db14384

Please sign in to comment.