Skip to content

Commit

Permalink
MAISTRA-2051: Use shared Kubernetes client in galley (maistra#241)
Browse files Browse the repository at this point in the history
This moves galley to the shared Kubernetes client, which will let it
use xns-informers for multi-namespace support like everything else.
  • Loading branch information
bison committed Jan 26, 2021
1 parent 27f243c commit 0b1567a
Show file tree
Hide file tree
Showing 15 changed files with 116 additions and 373 deletions.
10 changes: 3 additions & 7 deletions galley/pkg/config/analysis/local/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"istio.io/istio/galley/pkg/config/processor/transforms"
"istio.io/istio/galley/pkg/config/scope"
"istio.io/istio/galley/pkg/config/source/inmemory"
"istio.io/istio/galley/pkg/config/source/kube"
"istio.io/istio/galley/pkg/config/source/kube/apiserver"
kube_inmemory "istio.io/istio/galley/pkg/config/source/kube/inmemory"
"istio.io/istio/galley/pkg/config/util/kuberesource"
Expand All @@ -48,6 +47,7 @@ import (
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/collections"
"istio.io/istio/pkg/config/schema/snapshots"
kubelib "istio.io/istio/pkg/kube"
)

const (
Expand Down Expand Up @@ -267,12 +267,8 @@ func (sa *SourceAnalyzer) AddReaderKubeSource(readers []ReaderSource) error {

// AddRunningKubeSource adds a source based on a running k8s cluster to the current SourceAnalyzer
// Also tries to get mesh config from the running cluster, if it can
func (sa *SourceAnalyzer) AddRunningKubeSource(k kube.Interfaces) {
client, err := k.KubeClient()
if err != nil {
scope.Analysis.Errorf("error getting KubeClient: %v", err)
return
}
func (sa *SourceAnalyzer) AddRunningKubeSource(k kubelib.Client) {
client := k.Kube()

// Since we're using a running k8s source, try to get meshconfig and meshnetworks from the configmap.
if err := sa.addRunningKubeIstioConfigMapSource(client); err != nil {
Expand Down
20 changes: 9 additions & 11 deletions galley/pkg/config/analysis/local/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
"istio.io/istio/galley/pkg/config/testing/data"
"istio.io/istio/galley/pkg/config/testing/k8smeta"
"istio.io/istio/galley/pkg/config/util/kubeyaml"
"istio.io/istio/galley/pkg/testing/mock"
"istio.io/istio/pkg/config/resource"
"istio.io/istio/pkg/config/schema"
"istio.io/istio/pkg/config/schema/collection"
kubelib "istio.io/istio/pkg/kube"
)

type testAnalyzer struct {
Expand Down Expand Up @@ -150,11 +150,11 @@ func TestAddInMemorySource(t *testing.T) {
func TestAddRunningKubeSource(t *testing.T) {
g := NewWithT(t)

mk := mock.NewKube()
k := kubelib.NewFakeClient()

sa := NewSourceAnalyzer(k8smeta.MustGet(), blankCombinedAnalyzer, "", "", nil, false, timeout)

sa.AddRunningKubeSource(mk)
sa.AddRunningKubeSource(k)
g.Expect(*sa.meshCfg).To(Equal(*mesh.DefaultMeshConfig())) // Base default meshcfg
g.Expect(sa.meshNetworks.Networks).To(HaveLen(0))
g.Expect(sa.sources).To(HaveLen(1))
Expand All @@ -178,18 +178,16 @@ func TestAddRunningKubeSourceWithIstioMeshConfigMap(t *testing.T) {
},
}

mk := mock.NewKube()
client, err := mk.KubeClient()
if err != nil {
t.Fatalf("Error getting client for mock kube: %v", err)
}
k := kubelib.NewFakeClient()
client := k.Kube()

if _, err := client.CoreV1().ConfigMaps(istioNamespace.String()).Create(context.TODO(), cfg, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating mesh config configmap: %v", err)
}

sa := NewSourceAnalyzer(k8smeta.MustGet(), blankCombinedAnalyzer, "", istioNamespace, nil, false, timeout)

sa.AddRunningKubeSource(mk)
sa.AddRunningKubeSource(k)
g.Expect(sa.meshCfg.RootNamespace).To(Equal(testRootNamespace))
g.Expect(sa.meshNetworks.Networks).To(HaveLen(2))
g.Expect(sa.sources).To(HaveLen(1))
Expand Down Expand Up @@ -274,10 +272,10 @@ func TestResourceFiltering(t *testing.T) {
fn: func(_ analysis.Context) {},
inputs: []collection.Name{usedCollection.Name()},
}
mk := mock.NewKube()
k := kubelib.NewFakeClient()

sa := NewSourceAnalyzer(schema.MustGet(), analysis.Combine("a", a), "", "", nil, true, timeout)
sa.AddRunningKubeSource(mk)
sa.AddRunningKubeSource(k)

// All but the used collection should be disabled
for _, r := range recordedOptions.Schemas.All() {
Expand Down
10 changes: 2 additions & 8 deletions galley/pkg/config/source/kube/apiserver/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,17 @@
package apiserver

import (
"time"

"istio.io/istio/galley/pkg/config/source/kube"
"istio.io/istio/galley/pkg/config/source/kube/apiserver/status"
"istio.io/istio/pkg/config/schema/collection"
kubelib "istio.io/istio/pkg/kube"
)

// Options for the kube controller
type Options struct {
// The Client interfaces to use for connecting to the API server.
Client kube.Interfaces

ResyncPeriod time.Duration
Client kubelib.Client

Schemas collection.Schemas

StatusController status.Controller

WatchedNamespaces string
}
2 changes: 1 addition & 1 deletion galley/pkg/config/source/kube/apiserver/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *Source) Start() {

// Start the CRD listener. When the listener is fully-synced, the listening of actual resources will start.
scope.Source.Infof("Beginning CRD Discovery, to figure out resources that are available...")
s.provider = rt.NewProvider(s.options.Client, s.options.WatchedNamespaces, s.options.ResyncPeriod)
s.provider = rt.NewProvider(s.options.Client)
a := s.provider.GetAdapter(crdKubeResource.Resource())
s.crdWatcher = newWatcher(crdKubeResource, a, s.statusCtl)
s.crdWatcher.dispatch(event.HandlerFromFn(s.onCrdEvent))
Expand Down
33 changes: 17 additions & 16 deletions galley/pkg/config/source/kube/apiserver/source_builtin_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright Istio Authors

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,10 +27,10 @@ import (
"istio.io/istio/galley/pkg/config/scope"
"istio.io/istio/galley/pkg/config/testing/fixtures"
"istio.io/istio/galley/pkg/config/testing/k8smeta"
"istio.io/istio/galley/pkg/testing/mock"
"istio.io/istio/pkg/config/event"
"istio.io/istio/pkg/config/resource"
resource2 "istio.io/istio/pkg/config/schema/resource"
kubelib "istio.io/istio/pkg/kube"
"istio.io/pkg/log"
)

Expand Down Expand Up @@ -65,9 +66,8 @@ func TestBasic(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())
k := kubelib.NewFakeClient()
client := k.Kube()

// Start the source.
s := newOrFail(t, k, k8smeta.MustGet().KubeCollections(), nil)
Expand All @@ -81,6 +81,7 @@ func TestBasic(t *testing.T) {

acc.Clear()

var err error
node := &corev1.Node{
ObjectMeta: fakeObjectMeta,
Spec: corev1.NodeSpec{
Expand All @@ -106,9 +107,8 @@ func TestNodes(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())
k := kubelib.NewFakeClient()
client := k.Kube()

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -121,6 +121,7 @@ func TestNodes(t *testing.T) {
}
acc.Clear()

var err error
node := &corev1.Node{
ObjectMeta: fakeObjectMeta,
Spec: corev1.NodeSpec{
Expand Down Expand Up @@ -174,9 +175,8 @@ func TestPods(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())
k := kubelib.NewFakeClient()
client := k.Kube()

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -189,6 +189,7 @@ func TestPods(t *testing.T) {
}
acc.Clear()

var err error
pod := &corev1.Pod{
ObjectMeta: fakeObjectMeta,
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -252,9 +253,8 @@ func TestServices(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())
k := kubelib.NewFakeClient()
client := k.Kube()

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -267,6 +267,7 @@ func TestServices(t *testing.T) {
}
acc.Clear()

var err error
svc := &corev1.Service{
ObjectMeta: fakeObjectMeta,
Spec: corev1.ServiceSpec{
Expand Down Expand Up @@ -325,9 +326,8 @@ func TestEndpoints(t *testing.T) {
prevLevel := setDebugLogLevel()
defer restoreLogLevel(prevLevel)

k := mock.NewKube()
client, err := k.KubeClient()
g.Expect(err).To(BeNil())
k := kubelib.NewFakeClient()
client := k.Kube()

// Start the source.
s := newOrFail(t, k, metadata, nil)
Expand All @@ -340,6 +340,7 @@ func TestEndpoints(t *testing.T) {
}
acc.Clear()

var err error
eps := &corev1.Endpoints{
ObjectMeta: fakeObjectMeta,
Subsets: []corev1.EndpointSubset{
Expand Down
54 changes: 19 additions & 35 deletions galley/pkg/config/source/kube/apiserver/source_dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package apiserver_test

import (
"errors"
"sync/atomic"
"testing"

Expand All @@ -24,14 +23,12 @@ import (
extfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8sRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic/fake"
k8sTesting "k8s.io/client-go/testing"

"istio.io/istio/galley/pkg/config/analysis/diag"
"istio.io/istio/galley/pkg/config/analysis/msg"
"istio.io/istio/galley/pkg/config/source/kube"
"istio.io/istio/galley/pkg/config/source/kube/apiserver"
"istio.io/istio/galley/pkg/config/source/kube/apiserver/status"
"istio.io/istio/galley/pkg/config/source/kube/rt"
Expand All @@ -42,14 +39,11 @@ import (
"istio.io/istio/pkg/config/resource"
"istio.io/istio/pkg/config/schema/collection"
resource2 "istio.io/istio/pkg/config/schema/resource"
kubelib "istio.io/istio/pkg/kube"
)

func TestNewSource(t *testing.T) {
k := &mock.Kube{}
for i := 0; i < 100; i++ {
_ = fakeClient(k)
}

k := kubelib.NewFakeClient()
r := basicmeta.MustGet().KubeCollections()

_ = newOrFail(t, k, r, nil)
Expand Down Expand Up @@ -150,8 +144,8 @@ func TestEvents(t *testing.T) {

obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "testdata.istio.io/v1alpha1",
"kind": "Kind1",
"apiVersion": "networking.istio.io/v1beta1",
"kind": "Gateway",
"metadata": map[string]interface{}{
"name": "i1",
"namespace": "ns",
Expand Down Expand Up @@ -214,8 +208,8 @@ func TestEvents_WatchUpdatesStatusCtl(t *testing.T) {

obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "testdata.istio.io/v1alpha1",
"kind": "Kind1",
"apiVersion": "networking.istio.io/v1beta1",
"kind": "Gateway",
"metadata": map[string]interface{}{
"name": "i1",
"namespace": "ns",
Expand Down Expand Up @@ -342,14 +336,16 @@ func TestEvents_NoneForDisabled(t *testing.T) {
}

func TestSource_WatcherFailsCreatingInformer(t *testing.T) {
k := mock.NewKube()
wcrd := mockCrdWatch(k.APIExtClientSet)
// This test doesn't seem to work well with the new fake client.
// Needs to be looked into.
//
// The linter wants an istio issue in the message...
t.Skip("https://github.com/istio/istio/issues/100000000")

w, wcrd, k := createMocks()
r := basicmeta.MustGet().KubeCollections()
addCrdEvents(wcrd, r.All())

k.AddResponse(nil, errors.New("no cheese found"))

// Create and start the source
s := newOrFail(t, k, r, nil)
// Start/stop when informer is not created. It should not crash or cause errors.
Expand All @@ -363,14 +359,8 @@ func TestSource_WatcherFailsCreatingInformer(t *testing.T) {
acc.Clear()
wcrd.Stop()

wcrd = mockCrdWatch(k.APIExtClientSet)
addCrdEvents(wcrd, r.All())

// Now start properly and get events
cl := fake.NewSimpleDynamicClient(k8sRuntime.NewScheme())
k.AddResponse(cl, nil)
w := mockWatch(cl)

s.Start()

obj := &unstructured.Unstructured{
Expand Down Expand Up @@ -417,12 +407,11 @@ func TestUpdateMessage_NoStatusController_Panic(t *testing.T) {
s.Update(diag.Messages{})
}

func newOrFail(t *testing.T, ifaces kube.Interfaces, r collection.Schemas, sc status.Controller) *apiserver.Source {
func newOrFail(t *testing.T, kubeClient kubelib.Client, r collection.Schemas, sc status.Controller) *apiserver.Source {
t.Helper()
o := apiserver.Options{
Schemas: r,
ResyncPeriod: 0,
Client: ifaces,
Client: kubeClient,
StatusController: sc,
}
s := apiserver.New(o)
Expand All @@ -440,11 +429,12 @@ func start(s *apiserver.Source) *fixtures.Accumulator {
return acc
}

func createMocks() (*mock.Watch, *mock.Watch, *mock.Kube) {
k := mock.NewKube()
cl := fakeClient(k)
func createMocks() (*mock.Watch, *mock.Watch, kubelib.Client) {
k := kubelib.NewFakeClient()
cl := k.Dynamic().(*fake.FakeDynamicClient)
ext := k.Ext().(*extfake.Clientset)
w := mockWatch(cl)
wcrd := mockCrdWatch(k.APIExtClientSet)
wcrd := mockCrdWatch(ext)
return w, wcrd, k
}

Expand All @@ -457,12 +447,6 @@ func addCrdEvents(w *mock.Watch, res []collection.Schema) {
}
}

func fakeClient(k *mock.Kube) *fake.FakeDynamicClient {
cl := fake.NewSimpleDynamicClient(k8sRuntime.NewScheme())
k.AddResponse(cl, nil)
return cl
}

func mockWatch(cl *fake.FakeDynamicClient) *mock.Watch {
w := mock.NewWatch()
cl.PrependWatchReactor("*", func(_ k8sTesting.Action) (handled bool, ret watch.Interface, err error) {
Expand Down

0 comments on commit 0b1567a

Please sign in to comment.