Skip to content

Commit

Permalink
Split remaining Kubernetes imports from agent
Browse files Browse the repository at this point in the history
* Move mesh configmap watcher out of the core `mesh` package
* Move security kubernetes `util`s to its own package

This drops the Kubernetes dependency for the agent completely.

This essentially completes istio#26232
for the agent (but not Istiod).
  • Loading branch information
howardjohn committed Nov 18, 2020
1 parent 84107a2 commit ac314f6
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 174 deletions.
3 changes: 2 additions & 1 deletion pilot/pkg/bootstrap/mesh.go
Expand Up @@ -19,6 +19,7 @@ import (
"os"

"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/kubemesh"
"istio.io/istio/pkg/util/gogoprotomarshal"
"istio.io/pkg/filewatcher"
"istio.io/pkg/log"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (s *Server) initMeshConfiguration(args *PilotArgs, fileWatcher filewatcher.
// Watch the istio ConfigMap for mesh config changes.
// This may be necessary for external Istiod.
configMapName := getMeshConfigMapName(args.Revision)
s.environment.Watcher = mesh.NewConfigMapWatcher(
s.environment.Watcher = kubemesh.NewConfigMapWatcher(
s.kubeClient, args.Namespace, configMapName, configMapKey)
}

Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/networking/core/v1alpha3/fake.go
@@ -1,3 +1,4 @@
// +build !agent
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
Expand Up @@ -26,7 +26,7 @@ import (

"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/queue"
certutil "istio.io/istio/security/pkg/util"
"istio.io/istio/security/pkg/k8s"
"istio.io/pkg/log"
)

Expand Down Expand Up @@ -134,7 +134,7 @@ func (nc *NamespaceController) insertDataForNamespace(ns string) error {
Namespace: ns,
Labels: configMapLabel,
}
return certutil.InsertDataToConfigMap(nc.client, meta, nc.getData())
return k8s.InsertDataToConfigMap(nc.client, meta, nc.getData())
}

// On namespace change, update the config map.
Expand All @@ -148,7 +148,7 @@ func (nc *NamespaceController) namespaceChange(ns *v1.Namespace) error {

// When a config map is changed, merge the data into the configmap
func (nc *NamespaceController) configMapChange(cm *v1.ConfigMap) error {
if err := certutil.UpdateDataInConfigMap(nc.client, cm.DeepCopy(), nc.getData()); err != nil {
if err := k8s.UpdateDataInConfigMap(nc.client, cm.DeepCopy(), nc.getData()); err != nil {
return fmt.Errorf("error when inserting CA cert to configmap %v: %v", cm.Name, err)
}
return nil
Expand Down
Expand Up @@ -27,7 +27,7 @@ import (

"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/test/util/retry"
"istio.io/istio/security/pkg/util"
"istio.io/istio/security/pkg/k8s"
)

func TestNamespaceController(t *testing.T) {
Expand All @@ -45,7 +45,7 @@ func TestNamespaceController(t *testing.T) {
expectConfigMap(t, client, "foo", testdata)

newData := map[string]string{"key": "value", "foo": "bar"}
if err := util.InsertDataToConfigMap(client.CoreV1(), metav1.ObjectMeta{Name: CACertNamespaceConfigMap, Namespace: "foo"}, newData); err != nil {
if err := k8s.InsertDataToConfigMap(client.CoreV1(), metav1.ObjectMeta{Name: CACertNamespaceConfigMap, Namespace: "foo"}, newData); err != nil {
t.Fatal(err)
}
expectConfigMap(t, client, "foo", newData)
Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/xds/fake.go
@@ -1,3 +1,4 @@
// +build !agent
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
70 changes: 70 additions & 0 deletions pkg/config/mesh/kubemesh/watcher.go
@@ -0,0 +1,70 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubemesh

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/configmapwatcher"
"istio.io/pkg/log"
)

// NewConfigMapWatcher creates a new Watcher for changes to the given ConfigMap.
func NewConfigMapWatcher(client kube.Client, namespace, name, key string) mesh.Watcher {
defaultMesh := mesh.DefaultMeshConfig()
w := &mesh.InternalWatcher{MeshConfig: &defaultMesh}
c := configmapwatcher.NewController(client, namespace, name, func(cm *v1.ConfigMap) {
meshConfig, err := ReadConfigMap(cm, key)
if err != nil {
// Keep the last known config in case there's a misconfiguration issue.
log.Warnf("failed to read mesh config from ConfigMap: %v", err)
return
}
w.HandleMeshConfig(meshConfig)
})

stop := make(chan struct{})
go c.Run(stop)
// Ensure the ConfigMap is initially loaded if present.
cache.WaitForCacheSync(stop, c.HasSynced)
return w
}

func ReadConfigMap(cm *v1.ConfigMap, key string) (*meshconfig.MeshConfig, error) {
if cm == nil {
log.Info("no ConfigMap found, using default MeshConfig config")
defaultMesh := mesh.DefaultMeshConfig()
return &defaultMesh, nil
}

cfgYaml, exists := cm.Data[key]
if !exists {
return nil, fmt.Errorf("missing ConfigMap key %q", key)
}

meshConfig, err := mesh.ApplyMeshConfigDefaults(cfgYaml)
if err != nil {
return nil, fmt.Errorf("failed reading MeshConfig config: %v. YAML:\n%s", err, cfgYaml)
}

log.Info("Loaded MeshConfig config from Kubernetes API server.")
return meshConfig, nil
}
126 changes: 126 additions & 0 deletions pkg/config/mesh/kubemesh/watcher_test.go
@@ -0,0 +1,126 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubemesh

import (
"context"
"fmt"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/kube"
)

const (
namespace string = "istio-system"
name string = "istio"
key string = "MeshConfig"
)

func makeConfigMap(resourceVersion string, data map[string]string) *v1.ConfigMap {
return &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
ResourceVersion: resourceVersion,
},
Data: data,
}
}

func TestNewConfigMapWatcher(t *testing.T) {
yaml := "trustDomain: something.new"
m, err := mesh.ApplyMeshConfigDefaults(yaml)
if err != nil {
t.Fatal(err)
}

cm := makeConfigMap("1", map[string]string{
key: yaml,
})
badCM := makeConfigMap("2", map[string]string{
"other-key": yaml,
})
badCM2 := makeConfigMap("3", map[string]string{
key: "bad yaml",
})

client := kube.NewFakeClient()
cms := client.Kube().CoreV1().ConfigMaps(namespace)
w := NewConfigMapWatcher(client, namespace, name, key)

defaultMesh := mesh.DefaultMeshConfig()

var mu sync.Mutex
newM := &defaultMesh
w.AddMeshHandler(func() {
mu.Lock()
defer mu.Unlock()
newM = w.Mesh()
})

steps := []struct {
added *v1.ConfigMap
updated *v1.ConfigMap
deleted *v1.ConfigMap

expect *meshconfig.MeshConfig
}{
{expect: &defaultMesh},
{added: cm, expect: m},

// Handle misconfiguration errors.
{updated: badCM, expect: m},
{updated: cm, expect: m},
{updated: badCM2, expect: m},
{updated: badCM, expect: m},
{updated: cm, expect: m},

{deleted: cm, expect: &defaultMesh},
{added: badCM, expect: &defaultMesh},
}

for i, step := range steps {
t.Run(fmt.Sprintf("[%v]", i), func(t *testing.T) {
g := NewWithT(t)

switch {
case step.added != nil:
_, err := cms.Create(context.TODO(), step.added, metav1.CreateOptions{})
g.Expect(err).Should(BeNil())
case step.updated != nil:
_, err := cms.Update(context.TODO(), step.updated, metav1.UpdateOptions{})
g.Expect(err).Should(BeNil())
case step.deleted != nil:
g.Expect(cms.Delete(context.TODO(), step.deleted.Name, metav1.DeleteOptions{})).
Should(Succeed())
}

g.Eventually(w.Mesh).Should(Equal(step.expect))
g.Eventually(func() *meshconfig.MeshConfig {
mu.Lock()
defer mu.Unlock()
return newM
}, time.Second).Should(Equal(step.expect))
})
}
}

0 comments on commit ac314f6

Please sign in to comment.