From 752eca447a24ad964ea8bd7b09217e4b62aa702e Mon Sep 17 00:00:00 2001 From: Nitya Dhanushkodi Date: Sun, 21 Mar 2021 19:33:54 -0700 Subject: [PATCH] Cleanup and address review comments --- connect-inject/endpoints_controller.go | 39 +++++++-------------- connect-inject/endpoints_controller_test.go | 10 +++--- subcommand/inject-connect/command.go | 3 +- 3 files changed, 19 insertions(+), 33 deletions(-) diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 8aa916ec8d..8f3f1c864f 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -55,18 +55,18 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) { // error), we need to deregister all instances in Consul for that service. if k8serrors.IsNotFound(err) { // Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles - // the case where the Consul service name is different from the K8s service name. - err := r.deregisterServiceOnAllAgents(req.Name, req.Namespace, false, nil) - if err != nil { + // the case where the Consul service name is different from the Kubernetes service name. + if err := r.deregisterServiceOnAllAgents(req.Name, req.Namespace, false, nil); err != nil { return ctrl.Result{}, err } + return ctrl.Result{}, nil } else if err != nil { - r.Log.Error(err, "failed to get endpoints from Kubernetes", "namespace", req.Namespace, "name", req.Name) + r.Log.Error(err, "failed to get Endpoints from Kubernetes", "name", req.Name, "namespace", req.Namespace) return ctrl.Result{}, err } - r.Log.Info("retrieved service from kube", "serviceEndpoints", serviceEndpoints) + r.Log.Info("retrieved Kubernetes Endpoints", "serviceEndpoints", serviceEndpoints.Name, "serviceEndpoints namespace", serviceEndpoints.Namespace) // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare // against service instances in Consul to deregister them if they are not in the map. @@ -92,7 +92,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - if hasBeenInjected(&pod) { + if hasBeenInjected(pod) { // Create client for Consul agent local to the pod. client, err := r.getConsulClient(pod.Status.HostIP) if err != nil { @@ -131,8 +131,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) { // Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. - err = r.deregisterServiceOnAllAgents(serviceEndpoints.Name, serviceEndpoints.Namespace, true, endpointAddressMap) - if err != nil { + if err = r.deregisterServiceOnAllAgents(serviceEndpoints.Name, serviceEndpoints.Namespace, true, endpointAddressMap); err != nil { r.Log.Error(err, "failed to deregister service instances on all agents", "k8s service name", serviceEndpoints.Name, "k8s namespace", serviceEndpoints.Namespace) return ctrl.Result{}, err } @@ -154,11 +153,10 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service // TODO: remove logic in handler to always set the service name annotation // We only want that annotation to be present when explicitly overriding the consul svc name - // Otherwise, the Consul service name should equal the K8s Service name. + // Otherwise, the Consul service name should equal the Kubernetes Service name. // The service name in Consul defaults to the Endpoints object name, and is overridden by the pod // annotation annotationService. - var serviceName string - serviceName = serviceEndpoints.Name + serviceName := serviceEndpoints.Name if raw, ok := pod.Annotations[annotationService]; ok && raw != "" { serviceName = raw } @@ -295,7 +293,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam // Deregister each service instance that matches the metadata. for svcID, serviceRegistration := range svcs { r.Log.Info("deregistering service", "service id", svcID) - // Todo comment + // If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister + // every service instance. if selectivelyDeregister { if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok { // If the service address is not in the Endpoints addresses, deregister it. @@ -310,13 +309,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam return err } } - //err = client.Agent().ServiceDeregister(svcID) - //if err != nil { - // r.Log.Error(err, "failed to deregister service instance", "ID", svcID) - // return err - //} } - } return nil } @@ -399,17 +392,11 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { // getConsulClient returns an *api.Client that points at the consul agent local to the pod. func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) { - // todo: un-hardcode the scheme and port newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort) localConfig := api.DefaultConfig() localConfig.Address = newAddr - localClient, err := consul.NewClient(localConfig) - if err != nil { - return nil, err - } - - return localClient, err + return consul.NewClient(localConfig) } // shouldIgnore ignores namespaces where we don't connect-inject. @@ -435,7 +422,7 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool { } // hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. -func hasBeenInjected(pod *corev1.Pod) bool { +func hasBeenInjected(pod corev1.Pod) bool { if anno, ok := pod.Annotations[annotationStatus]; ok { if anno == injected { return true diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 7c830c981c..44a8a09177 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -75,22 +75,22 @@ func TestHasBeenInjected(t *testing.T) { t.Parallel() cases := []struct { name string - pod func() *corev1.Pod + pod func() corev1.Pod expected bool }{ { name: "Pod with annotation", - pod: func() *corev1.Pod { + pod: func() corev1.Pod { pod1 := createPod("pod1", "1.2.3.4", true) - return pod1 + return *pod1 }, expected: true, }, { name: "Pod without injected annotation", - pod: func() *corev1.Pod { + pod: func() corev1.Pod { pod1 := createPod("pod1", "1.2.3.4", false) - return pod1 + return *pod1 }, expected: false, }, diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 89263f6f0e..766d9de094 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -458,7 +458,6 @@ func (c *Command) Run(args []string) int { ctrlExitCh := make(chan error) // Create a manager for endpoints controller and the mutating webhook. - // Note: the webhook refactor PR will use this manager for the mutating webhook. zapLogger := zap.New(zap.UseDevMode(true), zap.Level(zapcore.InfoLevel)) ctrl.SetLogger(zapLogger) klog.SetLogger(zapLogger) @@ -472,7 +471,7 @@ func (c *Command) Run(args []string) int { setupLog.Error(err, "unable to start manager") return 1 } - // Start the endpoints controller + // Start the endpoints controller. if err = (&connectinject.EndpointsController{ Client: mgr.GetClient(), ConsulClient: c.consulClient,