From baf9c5ea7ae15b89aa32bda511e56bd3ad0aabce Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Thu, 28 Apr 2022 15:16:16 +0000 Subject: [PATCH] [Feature] Endpoints inspector --- CHANGELOG.md | 1 + pkg/deployment/deployment.go | 3 +- .../resources/inspector/endpoints.go | 185 ++++++++++++++++++ .../resources/inspector/endpoints_v1.go | 140 +++++++++++++ .../resources/inspector/inspector.go | 11 ++ .../resources/inspector/inspector_test.go | 4 +- .../k8sutil/inspector/endpoints/definition.go | 36 ++++ .../k8sutil/inspector/endpoints/v1/loader.go | 48 +++++ .../k8sutil/inspector/endpoints/v1/reader.go | 49 +++++ pkg/util/k8sutil/inspector/inspector.go | 2 + .../k8sutil/inspector/throttle/throttle.go | 16 +- 11 files changed, 490 insertions(+), 5 deletions(-) create mode 100644 pkg/deployment/resources/inspector/endpoints.go create mode 100644 pkg/deployment/resources/inspector/endpoints_v1.go create mode 100644 pkg/util/k8sutil/inspector/endpoints/definition.go create mode 100644 pkg/util/k8sutil/inspector/endpoints/v1/loader.go create mode 100644 pkg/util/k8sutil/inspector/endpoints/v1/reader.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fa0e61d7..bb6c097a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Change Log ## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A) +- (Feature) Add CoreV1 Endpoints Inspector ## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30) - (Bugfix) Orphan PVC are not removed diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index c81f6a33c..78709b732 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -226,7 +226,8 @@ func newDeploymentThrottle() throttle.Components { 10*time.Second, // Secret 10*time.Second, // Service 30*time.Second, // SA - 30*time.Second) // ServiceMonitor + 30*time.Second, // ServiceMonitor + 15*time.Second) // Endpoints } // New creates a new Deployment from the given API object. diff --git a/pkg/deployment/resources/inspector/endpoints.go b/pkg/deployment/resources/inspector/endpoints.go new file mode 100644 index 000000000..8a00185a4 --- /dev/null +++ b/pkg/deployment/resources/inspector/endpoints.go @@ -0,0 +1,185 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package inspector + +import ( + "context" + "time" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/globals" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func init() { + requireRegisterInspectorLoader(endpointsInspectorLoaderObj) +} + +var endpointsInspectorLoaderObj = endpointsInspectorLoader{} + +type endpointsInspectorLoader struct { +} + +func (p endpointsInspectorLoader) Component() throttle.Component { + return throttle.Endpoints +} + +func (p endpointsInspectorLoader) Load(ctx context.Context, i *inspectorState) { + var q endpointsInspector + p.loadV1(ctx, i, &q) + i.endpoints = &q + q.state = i + q.last = time.Now() +} + +func (p endpointsInspectorLoader) loadV1(ctx context.Context, i *inspectorState, q *endpointsInspector) { + var z endpointsInspectorV1 + + z.endpointsInspector = q + + z.endpoints, z.err = p.getV1Endpoints(ctx, i) + + q.v1 = &z +} + +func (p endpointsInspectorLoader) getV1Endpoints(ctx context.Context, i *inspectorState) (map[string]*core.Endpoints, error) { + objs, err := p.getV1EndpointsList(ctx, i) + if err != nil { + return nil, err + } + + r := make(map[string]*core.Endpoints, len(objs)) + + for id := range objs { + r[objs[id].GetName()] = objs[id] + } + + return r, nil +} + +func (p endpointsInspectorLoader) getV1EndpointsList(ctx context.Context, i *inspectorState) ([]*core.Endpoints, error) { + ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) + defer cancel() + obj, err := i.client.Kubernetes().CoreV1().Endpoints(i.namespace).List(ctxChild, meta.ListOptions{ + Limit: globals.GetGlobals().Kubernetes().RequestBatchSize().Get(), + }) + + if err != nil { + return nil, err + } + + items := obj.Items + cont := obj.Continue + var s = int64(len(items)) + + if z := obj.RemainingItemCount; z != nil { + s += *z + } + + ptrs := make([]*core.Endpoints, 0, s) + + for { + for id := range items { + ptrs = append(ptrs, &items[id]) + } + + if cont == "" { + break + } + + items, cont, err = p.getV1EndpointsListRequest(ctx, i, cont) + + if err != nil { + return nil, err + } + } + + return ptrs, nil +} + +func (p endpointsInspectorLoader) getV1EndpointsListRequest(ctx context.Context, i *inspectorState, cont string) ([]core.Endpoints, string, error) { + ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) + defer cancel() + obj, err := i.client.Kubernetes().CoreV1().Endpoints(i.namespace).List(ctxChild, meta.ListOptions{ + Limit: globals.GetGlobals().Kubernetes().RequestBatchSize().Get(), + Continue: cont, + }) + + if err != nil { + return nil, "", err + } + + return obj.Items, obj.Continue, err +} + +func (p endpointsInspectorLoader) Verify(i *inspectorState) error { + return nil +} + +func (p endpointsInspectorLoader) Copy(from, to *inspectorState, override bool) { + if to.endpoints != nil { + if !override { + return + } + } + + to.endpoints = from.endpoints + to.endpoints.state = to +} + +func (p endpointsInspectorLoader) Name() string { + return "endpoints" +} + +type endpointsInspector struct { + state *inspectorState + + last time.Time + + v1 *endpointsInspectorV1 +} + +func (p *endpointsInspector) LastRefresh() time.Time { + return p.last +} + +func (p *endpointsInspector) Refresh(ctx context.Context) error { + p.Throttle(p.state.throttles).Invalidate() + return p.state.refresh(ctx, endpointsInspectorLoaderObj) +} + +func (p endpointsInspector) Throttle(c throttle.Components) throttle.Throttle { + return c.Endpoints() +} + +func (p *endpointsInspector) validate() error { + if p == nil { + return errors.Newf("EndpointsInspector is nil") + } + + if p.state == nil { + return errors.Newf("Parent is nil") + } + + return p.v1.validate() +} diff --git a/pkg/deployment/resources/inspector/endpoints_v1.go b/pkg/deployment/resources/inspector/endpoints_v1.go new file mode 100644 index 000000000..952cc7a17 --- /dev/null +++ b/pkg/deployment/resources/inspector/endpoints_v1.go @@ -0,0 +1,140 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package inspector + +import ( + "context" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" + ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints/v1" + core "k8s.io/api/core/v1" + apiErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func (p *endpointsInspector) V1() (ins.Inspector, error) { + if p.v1.err != nil { + return nil, p.v1.err + } + + return p.v1, nil +} + +type endpointsInspectorV1 struct { + endpointsInspector *endpointsInspector + + endpoints map[string]*core.Endpoints + err error +} + +func (p *endpointsInspectorV1) Filter(filters ...ins.Filter) []*core.Endpoints { + z := p.ListSimple() + + r := make([]*core.Endpoints, 0, len(z)) + + for _, o := range z { + if !ins.FilterObject(o, filters...) { + continue + } + + r = append(r, o) + } + + return r +} + +func (p *endpointsInspectorV1) validate() error { + if p == nil { + return errors.Newf("EndpointsV1Inspector is nil") + } + + if p.endpointsInspector == nil { + return errors.Newf("Parent is nil") + } + + if p.endpoints == nil && p.err == nil { + return errors.Newf("Endpoints or err should be not nil") + } + + if p.endpoints != nil && p.err != nil { + return errors.Newf("Endpoints or err cannot be not nil together") + } + + return nil +} + +func (p *endpointsInspectorV1) ListSimple() []*core.Endpoints { + var r []*core.Endpoints + for _, endpoints := range p.endpoints { + r = append(r, endpoints) + } + + return r +} + +func (p *endpointsInspectorV1) GetSimple(name string) (*core.Endpoints, bool) { + endpoints, ok := p.endpoints[name] + if !ok { + return nil, false + } + + return endpoints, true +} + +func (p *endpointsInspectorV1) Iterate(action ins.Action, filters ...ins.Filter) error { + for _, endpoints := range p.endpoints { + if err := p.iterateEndpoints(endpoints, action, filters...); err != nil { + return err + } + } + + return nil +} + +func (p *endpointsInspectorV1) iterateEndpoints(endpoints *core.Endpoints, action ins.Action, filters ...ins.Filter) error { + for _, f := range filters { + if f == nil { + continue + } + + if !f(endpoints) { + return nil + } + } + + return action(endpoints) +} + +func (p *endpointsInspectorV1) Read() ins.ReadInterface { + return p +} + +func (p *endpointsInspectorV1) Get(ctx context.Context, name string, opts metav1.GetOptions) (*core.Endpoints, error) { + if s, ok := p.GetSimple(name); !ok { + return nil, apiErrors.NewNotFound(schema.GroupResource{ + Group: core.GroupName, + Resource: "endpoints", + }, name) + } else { + return s, nil + } +} diff --git a/pkg/deployment/resources/inspector/inspector.go b/pkg/deployment/resources/inspector/inspector.go index 62ca37786..1c36b9ab5 100644 --- a/pkg/deployment/resources/inspector/inspector.go +++ b/pkg/deployment/resources/inspector/inspector.go @@ -33,6 +33,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangoclustersynchronization" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangotask" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/node" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod" @@ -134,6 +135,7 @@ type inspectorState struct { arangoMembers *arangoMembersInspector arangoTasks *arangoTasksInspector arangoClusterSynchronizations *arangoClusterSynchronizationsInspector + endpoints *endpointsInspector throttles throttle.Components @@ -142,6 +144,10 @@ type inspectorState struct { initialised bool } +func (i *inspectorState) Endpoints() endpoints.Definition { + return i.endpoints +} + func (i *inspectorState) Initialised() bool { if i == nil { return false @@ -345,6 +351,10 @@ func (i *inspectorState) validate() error { return err } + if err := i.endpoints.validate(); err != nil { + return err + } + return nil } @@ -365,6 +375,7 @@ func (i *inspectorState) copyCore() *inspectorState { arangoClusterSynchronizations: i.arangoClusterSynchronizations, throttles: i.throttles.Copy(), versionInfo: i.versionInfo, + endpoints: i.endpoints, logger: i.logger, } } diff --git a/pkg/deployment/resources/inspector/inspector_test.go b/pkg/deployment/resources/inspector/inspector_test.go index 33facebee..14ad38d78 100644 --- a/pkg/deployment/resources/inspector/inspector_test.go +++ b/pkg/deployment/resources/inspector/inspector_test.go @@ -133,7 +133,7 @@ func getAllTypes() []string { func Test_Inspector_RefreshMatrix(t *testing.T) { c := kclient.NewFakeClient() - tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour) + tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour) i := NewInspector(tc, c, "test") @@ -293,7 +293,7 @@ func Test_Inspector_Load(t *testing.T) { func Test_Inspector_Invalidate(t *testing.T) { c := kclient.NewFakeClient() - tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour) + tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour) i := NewInspector(tc, c, "test") diff --git a/pkg/util/k8sutil/inspector/endpoints/definition.go b/pkg/util/k8sutil/inspector/endpoints/definition.go new file mode 100644 index 000000000..713bab570 --- /dev/null +++ b/pkg/util/k8sutil/inspector/endpoints/definition.go @@ -0,0 +1,36 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package endpoints + +import ( + v1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints/v1" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/refresh" +) + +type Inspector interface { + Endpoints() Definition +} + +type Definition interface { + refresh.Inspector + + V1() (v1.Inspector, error) +} diff --git a/pkg/util/k8sutil/inspector/endpoints/v1/loader.go b/pkg/util/k8sutil/inspector/endpoints/v1/loader.go new file mode 100644 index 000000000..f0144312a --- /dev/null +++ b/pkg/util/k8sutil/inspector/endpoints/v1/loader.go @@ -0,0 +1,48 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import core "k8s.io/api/core/v1" + +type Inspector interface { + ListSimple() []*core.Endpoints + GetSimple(name string) (*core.Endpoints, bool) + Filter(filters ...Filter) []*core.Endpoints + Iterate(action Action, filters ...Filter) error + Read() ReadInterface +} + +type Filter func(at *core.Endpoints) bool +type Action func(at *core.Endpoints) error + +func FilterObject(at *core.Endpoints, filters ...Filter) bool { + for _, f := range filters { + if f == nil { + continue + } + + if !f(at) { + return false + } + } + + return true +} diff --git a/pkg/util/k8sutil/inspector/endpoints/v1/reader.go b/pkg/util/k8sutil/inspector/endpoints/v1/reader.go new file mode 100644 index 000000000..45a3428be --- /dev/null +++ b/pkg/util/k8sutil/inspector/endpoints/v1/reader.go @@ -0,0 +1,49 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// ModInterface has methods to work with Endpoints resources only for creation +type ModInterface interface { + Create(ctx context.Context, endpoints *core.Endpoints, opts meta.CreateOptions) (*core.Endpoints, error) + Update(ctx context.Context, endpoints *core.Endpoints, opts meta.UpdateOptions) (*core.Endpoints, error) + UpdateStatus(ctx context.Context, endpoints *core.Endpoints, opts meta.UpdateOptions) (*core.Endpoints, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *core.Endpoints, err error) + Delete(ctx context.Context, name string, opts meta.DeleteOptions) error +} + +// Interface has methods to work with Endpoints resources. +type Interface interface { + ModInterface + ReadInterface +} + +// ReadInterface has methods to work with Endpoints resources with ReadOnly mode. +type ReadInterface interface { + Get(ctx context.Context, name string, opts meta.GetOptions) (*core.Endpoints, error) +} diff --git a/pkg/util/k8sutil/inspector/inspector.go b/pkg/util/k8sutil/inspector/inspector.go index 1e825ef6a..65ede96e9 100644 --- a/pkg/util/k8sutil/inspector/inspector.go +++ b/pkg/util/k8sutil/inspector/inspector.go @@ -28,6 +28,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangoclustersynchronization" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangotask" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget" @@ -57,6 +58,7 @@ type Inspector interface { serviceaccount.Inspector arangomember.Inspector server.Inspector + endpoints.Inspector node.Inspector arangoclustersynchronization.Inspector diff --git a/pkg/util/k8sutil/inspector/throttle/throttle.go b/pkg/util/k8sutil/inspector/throttle/throttle.go index 834223034..f042c36fd 100644 --- a/pkg/util/k8sutil/inspector/throttle/throttle.go +++ b/pkg/util/k8sutil/inspector/throttle/throttle.go @@ -30,10 +30,10 @@ type Inspector interface { } func NewAlwaysThrottleComponents() Components { - return NewThrottleComponents(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + return NewThrottleComponents(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) } -func NewThrottleComponents(acs, am, at, node, pvc, pod, pdb, secret, service, serviceAccount, sm time.Duration) Components { +func NewThrottleComponents(acs, am, at, node, pvc, pod, pdb, secret, service, serviceAccount, sm, endpoints time.Duration) Components { return &throttleComponents{ arangoClusterSynchronization: NewThrottle(acs), arangoMember: NewThrottle(am), @@ -46,6 +46,7 @@ func NewThrottleComponents(acs, am, at, node, pvc, pod, pdb, secret, service, se service: NewThrottle(service), serviceAccount: NewThrottle(serviceAccount), serviceMonitor: NewThrottle(sm), + endpoints: NewThrottle(endpoints), } } @@ -65,6 +66,7 @@ const ( Service Component = "Service" ServiceAccount Component = "ServiceAccount" ServiceMonitor Component = "ServiceMonitor" + Endpoints Component = "Endpoints" ) func AllComponents() []Component { @@ -80,6 +82,7 @@ func AllComponents() []Component { Service, ServiceAccount, ServiceMonitor, + Endpoints, } } @@ -95,6 +98,7 @@ type Components interface { Service() Throttle ServiceAccount() Throttle ServiceMonitor() Throttle + Endpoints() Throttle Get(c Component) Throttle Invalidate(components ...Component) @@ -115,6 +119,11 @@ type throttleComponents struct { service Throttle serviceAccount Throttle serviceMonitor Throttle + endpoints Throttle +} + +func (t *throttleComponents) Endpoints() Throttle { + return t.endpoints } func (t *throttleComponents) Counts() ComponentCount { @@ -160,6 +169,8 @@ func (t *throttleComponents) Get(c Component) Throttle { return t.serviceAccount case ServiceMonitor: return t.serviceMonitor + case Endpoints: + return t.endpoints default: return NewAlwaysThrottle() } @@ -178,6 +189,7 @@ func (t *throttleComponents) Copy() Components { service: t.service.Copy(), serviceAccount: t.serviceAccount.Copy(), serviceMonitor: t.serviceMonitor.Copy(), + endpoints: t.endpoints.Copy(), } }