Skip to content

Commit

Permalink
Revert "MAISTRA-2051: Use shared Kubernetes client in galley (maistra…
Browse files Browse the repository at this point in the history
…#241)"

This reverts commit 0b1567a.
  • Loading branch information
bison committed Feb 1, 2021
1 parent 3aed634 commit 5a2b03c
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 116 deletions.
10 changes: 7 additions & 3 deletions galley/pkg/config/analysis/local/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"istio.io/istio/galley/pkg/config/processor/transforms"
"istio.io/istio/galley/pkg/config/scope"
"istio.io/istio/galley/pkg/config/source/inmemory"
"istio.io/istio/galley/pkg/config/source/kube"
"istio.io/istio/galley/pkg/config/source/kube/apiserver"
kube_inmemory "istio.io/istio/galley/pkg/config/source/kube/inmemory"
"istio.io/istio/galley/pkg/config/util/kuberesource"
Expand All @@ -47,7 +48,6 @@ import (
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/collections"
"istio.io/istio/pkg/config/schema/snapshots"
kubelib "istio.io/istio/pkg/kube"
)

const (
Expand Down Expand Up @@ -267,8 +267,12 @@ func (sa *SourceAnalyzer) AddReaderKubeSource(readers []ReaderSource) error {

// AddRunningKubeSource adds a source based on a running k8s cluster to the current SourceAnalyzer
// Also tries to get mesh config from the running cluster, if it can
func (sa *SourceAnalyzer) AddRunningKubeSource(k kubelib.Client) {
client := k.Kube()
func (sa *SourceAnalyzer) AddRunningKubeSource(k kube.Interfaces) {
client, err := k.KubeClient()
if err != nil {
scope.Analysis.Errorf("error getting KubeClient: %v", err)
return
}

// Since we're using a running k8s source, try to get meshconfig and meshnetworks from the configmap.
if err := sa.addRunningKubeIstioConfigMapSource(client); err != nil {
Expand Down
20 changes: 11 additions & 9 deletions galley/pkg/config/analysis/local/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
"istio.io/istio/galley/pkg/config/testing/data"
"istio.io/istio/galley/pkg/config/testing/k8smeta"
"istio.io/istio/galley/pkg/config/util/kubeyaml"
"istio.io/istio/galley/pkg/testing/mock"
"istio.io/istio/pkg/config/resource"
"istio.io/istio/pkg/config/schema"
"istio.io/istio/pkg/config/schema/collection"
kubelib "istio.io/istio/pkg/kube"
)

type testAnalyzer struct {
Expand Down Expand Up @@ -150,11 +150,11 @@ func TestAddInMemorySource(t *testing.T) {
func TestAddRunningKubeSource(t *testing.T) {
g := NewWithT(t)

k := kubelib.NewFakeClient()
mk := mock.NewKube()

sa := NewSourceAnalyzer(k8smeta.MustGet(), blankCombinedAnalyzer, "", "", nil, false, timeout)

sa.AddRunningKubeSource(k)
sa.AddRunningKubeSource(mk)
g.Expect(*sa.meshCfg).To(Equal(*mesh.DefaultMeshConfig())) // Base default meshcfg
g.Expect(sa.meshNetworks.Networks).To(HaveLen(0))
g.Expect(sa.sources).To(HaveLen(1))
Expand All @@ -178,16 +178,18 @@ func TestAddRunningKubeSourceWithIstioMeshConfigMap(t *testing.T) {
},
}

k := kubelib.NewFakeClient()
client := k.Kube()

mk := mock.NewKube()
client, err := mk.KubeClient()
if err != nil {
t.Fatalf("Error getting client for mock kube: %v", err)
}
if _, err := client.CoreV1().ConfigMaps(istioNamespace.String()).Create(context.TODO(), cfg, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating mesh config configmap: %v", err)
}

sa := NewSourceAnalyzer(k8smeta.MustGet(), blankCombinedAnalyzer, "", istioNamespace, nil, false, timeout)

sa.AddRunningKubeSource(k)
sa.AddRunningKubeSource(mk)
g.Expect(sa.meshCfg.RootNamespace).To(Equal(testRootNamespace))
g.Expect(sa.meshNetworks.Networks).To(HaveLen(2))
g.Expect(sa.sources).To(HaveLen(1))
Expand Down Expand Up @@ -272,10 +274,10 @@ func TestResourceFiltering(t *testing.T) {
fn: func(_ analysis.Context) {},
inputs: []collection.Name{usedCollection.Name()},
}
k := kubelib.NewFakeClient()
mk := mock.NewKube()

sa := NewSourceAnalyzer(schema.MustGet(), analysis.Combine("a", a), "", "", nil, true, timeout)
sa.AddRunningKubeSource(k)
sa.AddRunningKubeSource(mk)

// All but the used collection should be disabled
for _, r := range recordedOptions.Schemas.All() {
Expand Down
10 changes: 8 additions & 2 deletions galley/pkg/config/source/kube/apiserver/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
package apiserver

import (
"time"

"istio.io/istio/galley/pkg/config/source/kube"
"istio.io/istio/galley/pkg/config/source/kube/apiserver/status"
"istio.io/istio/pkg/config/schema/collection"
kubelib "istio.io/istio/pkg/kube"
)

// Options for the kube controller
type Options struct {
// The Client interfaces to use for connecting to the API server.
Client kubelib.Client
Client kube.Interfaces

ResyncPeriod time.Duration

Schemas collection.Schemas

StatusController status.Controller

WatchedNamespaces string
}
2 changes: 1 addition & 1 deletion galley/pkg/config/source/kube/apiserver/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *Source) Start() {

// Start the CRD listener. When the listener is fully-synced, the listening of actual resources will start.
scope.Source.Infof("Beginning CRD Discovery, to figure out resources that are available...")
s.provider = rt.NewProvider(s.options.Client)
s.provider = rt.NewProvider(s.options.Client, s.options.WatchedNamespaces, s.options.ResyncPeriod)
a := s.provider.GetAdapter(crdKubeResource.Resource())
s.crdWatcher = newWatcher(crdKubeResource, a, s.statusCtl)
s.crdWatcher.dispatch(event.HandlerFromFn(s.onCrdEvent))
Expand Down
33 changes: 16 additions & 17 deletions galley/pkg/config/source/kube/apiserver/source_builtin_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright Istio Authors

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,10 +26,10 @@ import (
"istio.io/istio/galley/pkg/config/scope"
"istio.io/istio/galley/pkg/config/testing/fixtures"
"istio.io/istio/galley/pkg/config/testing/k8smeta"
"istio.io/istio/galley/pkg/testing/mock"
"istio.io/istio/pkg/config/event"
"istio.io/istio/pkg/config/resource"
resource2 "istio.io/istio/pkg/config/schema/resource"
kubelib "istio.io/istio/pkg/kube"
"istio.io/pkg/log"
)

Expand Down Expand Up @@ -66,8 +65,9 @@ func TestBasic(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := kubelib.NewFakeClient()
client := k.Kube()
k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())

// Start the source.
s := newOrFail(t, k, k8smeta.MustGet().KubeCollections(), nil)
Expand All @@ -81,7 +81,6 @@ func TestBasic(t *testing.T) {

acc.Clear()

var err error
node := &corev1.Node{
ObjectMeta: fakeObjectMeta,
Spec: corev1.NodeSpec{
Expand All @@ -107,8 +106,9 @@ func TestNodes(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := kubelib.NewFakeClient()
client := k.Kube()
k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -121,7 +121,6 @@ func TestNodes(t *testing.T) {
}
acc.Clear()

var err error
node := &corev1.Node{
ObjectMeta: fakeObjectMeta,
Spec: corev1.NodeSpec{
Expand Down Expand Up @@ -175,8 +174,9 @@ func TestPods(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := kubelib.NewFakeClient()
client := k.Kube()
k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -189,7 +189,6 @@ func TestPods(t *testing.T) {
}
acc.Clear()

var err error
pod := &corev1.Pod{
ObjectMeta: fakeObjectMeta,
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -253,8 +252,9 @@ func TestServices(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := kubelib.NewFakeClient()
client := k.Kube()
k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -267,7 +267,6 @@ func TestServices(t *testing.T) {
}
acc.Clear()

var err error
svc := &corev1.Service{
ObjectMeta: fakeObjectMeta,
Spec: corev1.ServiceSpec{
Expand Down Expand Up @@ -326,8 +325,9 @@ func TestEndpoints(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := kubelib.NewFakeClient()
client := k.Kube()
k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -340,7 +340,6 @@ func TestEndpoints(t *testing.T) {
}
acc.Clear()

var err error
eps := &corev1.Endpoints{
ObjectMeta: fakeObjectMeta,
Subsets: []corev1.EndpointSubset{
Expand Down
54 changes: 35 additions & 19 deletions galley/pkg/config/source/kube/apiserver/source_dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package apiserver_test

import (
"errors"
"sync/atomic"
"testing"

Expand All @@ -23,12 +24,14 @@ import (
extfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8sRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic/fake"
k8sTesting "k8s.io/client-go/testing"

"istio.io/istio/galley/pkg/config/analysis/diag"
"istio.io/istio/galley/pkg/config/analysis/msg"
"istio.io/istio/galley/pkg/config/source/kube"
"istio.io/istio/galley/pkg/config/source/kube/apiserver"
"istio.io/istio/galley/pkg/config/source/kube/apiserver/status"
"istio.io/istio/galley/pkg/config/source/kube/rt"
Expand All @@ -39,11 +42,14 @@ import (
"istio.io/istio/pkg/config/resource"
"istio.io/istio/pkg/config/schema/collection"
resource2 "istio.io/istio/pkg/config/schema/resource"
kubelib "istio.io/istio/pkg/kube"
)

func TestNewSource(t *testing.T) {
k := kubelib.NewFakeClient()
k := &mock.Kube{}
for i := 0; i < 100; i++ {
_ = fakeClient(k)
}

r := basicmeta.MustGet().KubeCollections()

_ = newOrFail(t, k, r, nil)
Expand Down Expand Up @@ -144,8 +150,8 @@ func TestEvents(t *testing.T) {

obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "networking.istio.io/v1beta1",
"kind": "Gateway",
"apiVersion": "testdata.istio.io/v1alpha1",
"kind": "Kind1",
"metadata": map[string]interface{}{
"name": "i1",
"namespace": "ns",
Expand Down Expand Up @@ -208,8 +214,8 @@ func TestEvents_WatchUpdatesStatusCtl(t *testing.T) {

obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "networking.istio.io/v1beta1",
"kind": "Gateway",
"apiVersion": "testdata.istio.io/v1alpha1",
"kind": "Kind1",
"metadata": map[string]interface{}{
"name": "i1",
"namespace": "ns",
Expand Down Expand Up @@ -336,16 +342,14 @@ func TestEvents_NoneForDisabled(t *testing.T) {
}

func TestSource_WatcherFailsCreatingInformer(t *testing.T) {
// This test doesn't seem to work well with the new fake client.
// Needs to be looked into.
//
// The linter wants an istio issue in the message...
t.Skip("https://github.com/istio/istio/issues/100000000")
k := mock.NewKube()
wcrd := mockCrdWatch(k.APIExtClientSet)

w, wcrd, k := createMocks()
r := basicmeta.MustGet().KubeCollections()
addCrdEvents(wcrd, r.All())

k.AddResponse(nil, errors.New("no cheese found"))

// Create and start the source
s := newOrFail(t, k, r, nil)
// Start/stop when informer is not created. It should not crash or cause errors.
Expand All @@ -359,8 +363,14 @@ func TestSource_WatcherFailsCreatingInformer(t *testing.T) {
acc.Clear()
wcrd.Stop()

wcrd = mockCrdWatch(k.APIExtClientSet)
addCrdEvents(wcrd, r.All())

// Now start properly and get events
cl := fake.NewSimpleDynamicClient(k8sRuntime.NewScheme())
k.AddResponse(cl, nil)
w := mockWatch(cl)

s.Start()

obj := &unstructured.Unstructured{
Expand Down Expand Up @@ -407,11 +417,12 @@ func TestUpdateMessage_NoStatusController_Panic(t *testing.T) {
s.Update(diag.Messages{})
}

func newOrFail(t *testing.T, kubeClient kubelib.Client, r collection.Schemas, sc status.Controller) *apiserver.Source {
func newOrFail(t *testing.T, ifaces kube.Interfaces, r collection.Schemas, sc status.Controller) *apiserver.Source {
t.Helper()
o := apiserver.Options{
Schemas: r,
Client: kubeClient,
ResyncPeriod: 0,
Client: ifaces,
StatusController: sc,
}
s := apiserver.New(o)
Expand All @@ -429,12 +440,11 @@ func start(s *apiserver.Source) *fixtures.Accumulator {
return acc
}

func createMocks() (*mock.Watch, *mock.Watch, kubelib.Client) {
k := kubelib.NewFakeClient()
cl := k.Dynamic().(*fake.FakeDynamicClient)
ext := k.Ext().(*extfake.Clientset)
func createMocks() (*mock.Watch, *mock.Watch, *mock.Kube) {
k := mock.NewKube()
cl := fakeClient(k)
w := mockWatch(cl)
wcrd := mockCrdWatch(ext)
wcrd := mockCrdWatch(k.APIExtClientSet)
return w, wcrd, k
}

Expand All @@ -447,6 +457,12 @@ func addCrdEvents(w *mock.Watch, res []collection.Schema) {
}
}

func fakeClient(k *mock.Kube) *fake.FakeDynamicClient {
cl := fake.NewSimpleDynamicClient(k8sRuntime.NewScheme())
k.AddResponse(cl, nil)
return cl
}

func mockWatch(cl *fake.FakeDynamicClient) *mock.Watch {
w := mock.NewWatch()
cl.PrependWatchReactor("*", func(_ k8sTesting.Action) (handled bool, ret watch.Interface, err error) {
Expand Down

0 comments on commit 5a2b03c

Please sign in to comment.