Skip to content

Commit

Permalink
Rebase master and update controller to use latest controller-runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnotashwin committed Mar 30, 2021
1 parent 16c3694 commit 37a4384
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 75 deletions.
30 changes: 15 additions & 15 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,24 @@ type EndpointsController struct {
ReleaseNamespace string
Log logr.Logger
Scheme *runtime.Scheme
Ctx context.Context
context.Context
}

func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var serviceEndpoints corev1.Endpoints

if shouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) {
return ctrl.Result{}, nil
}

err := r.Client.Get(r.Ctx, req.NamespacedName, &serviceEndpoints)
err := r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)

// If the endpoints object has been deleted (and we get an IsNotFound
// error), we need to deregister all instances in Consul for that service.
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles
// the case where the Consul service name is different from the Kubernetes service name.
if err = r.deregisterServiceOnAllAgents(req.Name, req.Namespace, nil); err != nil {
if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
Expand All @@ -93,7 +93,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Get pod associated with this address.
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(r.Ctx, objectKey, &pod); err != nil {
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod from Kubernetes", "pod-name", address.TargetRef.Name)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterServiceOnAllAgents(serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to deregister service instances on all agents", "k8s-service-name", serviceEndpoints.Name, "k8s-namespace", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}
Expand All @@ -156,7 +156,7 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
For(&corev1.Endpoints{}).
Watches(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForRunningAgentPods)},
handler.EnqueueRequestsFromMapFunc(r.requestsForRunningAgentPods),
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterAgentPods)),
).Complete(r)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {

// Get all agents by getting pods with label component=client, app=consul and release=<ReleaseName>
list := corev1.PodList{}
Expand All @@ -292,7 +292,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam
"release": r.ReleaseName,
}),
}
if err := r.Client.List(r.Ctx, &list, &listOptions); err != nil {
if err := r.Client.List(ctx, &list, &listOptions); err != nil {
r.Log.Error(err, "failed to get agent pods from Kubernetes")
return err
}
Expand Down Expand Up @@ -445,8 +445,8 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
// which in this case are Pods. It only returns true if the Pod is a Consul Client Agent Pod. It reads the labels
// from the meta of the resource and uses the values of the "app" and "component" label to validate that
// the Pod is a Consul Client Agent.
func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.Object) bool {
podLabels := meta.GetLabels()
func (r EndpointsController) filterAgentPods(object client.Object) bool {
podLabels := object.GetLabels()
app, ok := podLabels["app"]
if !ok {
return false
Expand All @@ -473,10 +473,10 @@ func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.
// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a
// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObject) []ctrl.Request {
func (r EndpointsController) requestsForRunningAgentPods(object client.Object) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.Meta.GetName())
err := r.Client.Get(r.Ctx, types.NamespacedName{Name: object.Meta.GetName(), Namespace: object.Meta.GetNamespace()}, &consulClientPod)
r.Log.Info("received update for consulClientPod", "podName", object.GetName())
err := r.Client.Get(r.Context, types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
Expand All @@ -503,7 +503,7 @@ func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObjec

// Get the list of all endpoints.
var endpointsList corev1.EndpointsList
err = r.Client.List(r.Ctx, &endpointsList)
err = r.Client.List(r.Context, &endpointsList)
if err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
Expand Down
50 changes: 24 additions & 26 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/handler"
)

func TestShouldIgnore(t *testing.T) {
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {

// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
client := fake.NewFakeClient(k8sObjects...)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
Expand All @@ -634,7 +634,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {

// Create the endpoints controller
ep := &EndpointsController{
Client: client,
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
Expand All @@ -649,7 +649,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
Name: "service-created",
}

resp, err := ep.Reconcile(ctrl.Request{
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
Expand Down Expand Up @@ -1212,7 +1212,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {

// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
client := fake.NewFakeClient(k8sObjects...)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
Expand All @@ -1237,7 +1237,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {

// Create the endpoints controller
ep := &EndpointsController{
Client: client,
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
Expand All @@ -1252,7 +1252,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "service-updated",
}

resp, err := ep.Reconcile(ctrl.Request{
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}

// Create fake k8s client
client := fake.NewFakeClient(fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(fakeClientPod).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
Expand All @@ -1373,7 +1373,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {

// Create the endpoints controller
ep := &EndpointsController{
Client: client,
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
Expand All @@ -1389,7 +1389,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
Namespace: "default",
Name: "service-deleted",
}
resp, err := ep.Reconcile(ctrl.Request{
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
Expand All @@ -1410,11 +1410,11 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
func TestFilterAgentPods(t *testing.T) {
t.Parallel()
cases := map[string]struct {
meta metav1.Object
object client.Object
expected bool
}{
"label[app]=consul label[component]=client label[release] consul": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "consul",
Expand All @@ -1426,11 +1426,11 @@ func TestFilterAgentPods(t *testing.T) {
expected: true,
},
"no labels": {
meta: &corev1.Pod{},
object: &corev1.Pod{},
expected: false,
},
"label[app] empty": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"component": "client",
Expand All @@ -1441,7 +1441,7 @@ func TestFilterAgentPods(t *testing.T) {
expected: false,
},
"label[component] empty": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "consul",
Expand All @@ -1452,7 +1452,7 @@ func TestFilterAgentPods(t *testing.T) {
expected: false,
},
"label[release] empty": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "consul",
Expand All @@ -1463,7 +1463,7 @@ func TestFilterAgentPods(t *testing.T) {
expected: false,
},
"label[app]!=consul label[component]=client label[release]=consul": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "not-consul",
Expand All @@ -1475,7 +1475,7 @@ func TestFilterAgentPods(t *testing.T) {
expected: false,
},
"label[component]!=client label[app]=consul label[release]=consul": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "consul",
Expand All @@ -1487,7 +1487,7 @@ func TestFilterAgentPods(t *testing.T) {
expected: false,
},
"label[release]!=consul label[app]=consul label[component]=client": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "consul",
Expand All @@ -1499,7 +1499,7 @@ func TestFilterAgentPods(t *testing.T) {
expected: false,
},
"label[app]!=consul label[component]!=client label[release]!=consul": {
meta: &corev1.Pod{
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "not-consul",
Expand All @@ -1518,7 +1518,7 @@ func TestFilterAgentPods(t *testing.T) {
ReleaseName: "consul",
}

result := controller.filterAgentPods(test.meta, nil)
result := controller.filterAgentPods(test.object)
require.Equal(t, test.expected, result)
})
}
Expand Down Expand Up @@ -1969,7 +1969,6 @@ func TestRequestsForRunningAgentPods(t *testing.T) {

for name, test := range cases {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
logger := logrtest.TestLogger{T: t}
s := runtime.NewScheme()
s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.Pod{}, &corev1.Endpoints{}, &corev1.EndpointsList{})
Expand All @@ -1981,19 +1980,18 @@ func TestRequestsForRunningAgentPods(t *testing.T) {
objects = append(objects, endpoint)
}

fakeClient := fake.NewFakeClientWithScheme(s, objects...)
fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()

controller := &EndpointsController{
Client: fakeClient,
Ctx: ctx,
Scheme: s,
Log: logger,
}
var requests []ctrl.Request
if test.agentPod != nil {
requests = controller.requestsForRunningAgentPods(handler.MapObject{Meta: test.agentPod})
requests = controller.requestsForRunningAgentPods(test.agentPod)
} else {
requests = controller.requestsForRunningAgentPods(handler.MapObject{Meta: minimal()})
requests = controller.requestsForRunningAgentPods(minimal())
}
require.ElementsMatch(t, requests, test.expectedRequests)
})
Expand Down
Loading

0 comments on commit 37a4384

Please sign in to comment.