From 497087a4fd7afcb016751f606725e2670a372385 Mon Sep 17 00:00:00 2001 From: Jiaxun Song Date: Sun, 16 Jun 2024 07:52:58 +0000 Subject: [PATCH] use Pod informer to avoid Pod GETs --- cmd/csi_driver/main.go | 23 ++++++------ deploy/base/node/node_setup.yaml | 2 +- pkg/cloud_provider/clientset/clientset.go | 44 ++++++++++++++++++----- pkg/cloud_provider/clientset/fake.go | 4 ++- pkg/csi_driver/node.go | 2 +- test/e2e/e2e_test.go | 2 +- 6 files changed, 55 insertions(+), 22 deletions(-) diff --git a/cmd/csi_driver/main.go b/cmd/csi_driver/main.go index 697917bd..b83b4d2f 100644 --- a/cmd/csi_driver/main.go +++ b/cmd/csi_driver/main.go @@ -35,14 +35,15 @@ import ( ) var ( - endpoint = flag.String("endpoint", "unix:/tmp/csi.sock", "CSI endpoint") - nodeID = flag.String("nodeid", "", "node id") - runController = flag.Bool("controller", false, "run controller service") - runNode = flag.Bool("node", false, "run node service") - kubeconfigPath = flag.String("kubeconfig-path", "", "The kubeconfig path.") - identityPool = flag.String("identity-pool", "", "The Identity Pool to authenticate with GCS API.") - identityProvider = flag.String("identity-provider", "", "The Identity Provider to authenticate with GCS API.") - enableProfiling = flag.Bool("enable-profiling", false, "enable the golang pprof at port 6060") + endpoint = flag.String("endpoint", "unix:/tmp/csi.sock", "CSI endpoint") + nodeID = flag.String("nodeid", "", "node id") + runController = flag.Bool("controller", false, "run controller service") + runNode = flag.Bool("node", false, "run node service") + kubeconfigPath = flag.String("kubeconfig-path", "", "The kubeconfig path.") + identityPool = flag.String("identity-pool", "", "The Identity Pool to authenticate with GCS API.") + identityProvider = flag.String("identity-provider", "", "The Identity Provider to authenticate with GCS API.") + enableProfiling = flag.Bool("enable-profiling", false, "enable the golang pprof at port 6060") + informerResyncDurationSec = flag.Int("informer-resync-duration-sec", 1800, "informer resync duration in seconds") // These are set at compile time. version = "unknown" @@ -73,9 +74,9 @@ func main() { }() } - clientset, err := clientset.New(*kubeconfigPath) + clientset, err := clientset.New(*kubeconfigPath, *informerResyncDurationSec) if err != nil { - klog.Fatal("Failed to configure k8s client") + klog.Fatalf("Failed to configure k8s client: %v", err) } meta, err := metadata.NewMetadataService(*identityPool, *identityProvider) @@ -95,6 +96,8 @@ func main() { klog.Fatalf("NodeID cannot be empty for node service") } + clientset.ConfigurePodLister(*nodeID) + mounter, err = csimounter.New("") if err != nil { klog.Fatalf("Failed to prepare CSI mounter: %v", err) diff --git a/deploy/base/node/node_setup.yaml b/deploy/base/node/node_setup.yaml index f2a4ea8f..e949353a 100755 --- a/deploy/base/node/node_setup.yaml +++ b/deploy/base/node/node_setup.yaml @@ -35,7 +35,7 @@ metadata: rules: - apiGroups: [""] resources: ["pods"] - verbs: ["get"] + verbs: ["get", "watch", "list"] - apiGroups: [""] resources: ["serviceaccounts"] verbs: ["get"] diff --git a/pkg/cloud_provider/clientset/clientset.go b/pkg/cloud_provider/clientset/clientset.go index 2739ffd2..c1c63e8d 100644 --- a/pkg/cloud_provider/clientset/clientset.go +++ b/pkg/cloud_provider/clientset/clientset.go @@ -19,19 +19,24 @@ package clientset import ( "context" + "errors" "fmt" + "time" authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" ) type Interface interface { - GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) + ConfigurePodLister(nodeName string) + GetPod(namespace, name string) (*corev1.Pod, error) CreateServiceAccountToken(ctx context.Context, namespace, name string, tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) GetGCPServiceAccountName(ctx context.Context, namespace, name string) (string, error) } @@ -42,10 +47,12 @@ type PodInfo struct { } type Clientset struct { - k8sClients kubernetes.Interface + k8sClients kubernetes.Interface + podLister listersv1.PodLister + informerResyncDurationSec int } -func New(kubeconfigPath string) (Interface, error) { +func New(kubeconfigPath string, informerResyncDurationSec int) (Interface, error) { var err error var rc *rest.Config if kubeconfigPath != "" { @@ -56,19 +63,40 @@ func New(kubeconfigPath string) (Interface, error) { rc, err = rest.InClusterConfig() } if err != nil { - klog.Fatalf("Failed to read kubeconfig: %v", err) + return nil, fmt.Errorf("failed to read kubeconfig: %w", err) } clientset, err := kubernetes.NewForConfig(rc) if err != nil { - klog.Fatal("failed to configure k8s client") + return nil, fmt.Errorf("failed to configure k8s client: %w", err) } - return &Clientset{k8sClients: clientset}, nil + return &Clientset{k8sClients: clientset, informerResyncDurationSec: informerResyncDurationSec}, nil } -func (c *Clientset) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) { - return c.k8sClients.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) +func (c *Clientset) ConfigurePodLister(nodeName string) { + informerFactory := informers.NewSharedInformerFactoryWithOptions( + c.k8sClients, + time.Duration(c.informerResyncDurationSec)*time.Second, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = "spec.nodeName=" + nodeName + }), + ) + podLister := informerFactory.Core().V1().Pods().Lister() + + ctx := context.Background() + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + c.podLister = podLister +} + +func (c *Clientset) GetPod(namespace, name string) (*corev1.Pod, error) { + if c.podLister == nil { + return nil, errors.New("pod informer is not ready") + } + + return c.podLister.Pods(namespace).Get(name) } func (c *Clientset) CreateServiceAccountToken(ctx context.Context, namespace, name string, tokenRequest *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { diff --git a/pkg/cloud_provider/clientset/fake.go b/pkg/cloud_provider/clientset/fake.go index b104ed3b..fcacce3a 100644 --- a/pkg/cloud_provider/clientset/fake.go +++ b/pkg/cloud_provider/clientset/fake.go @@ -28,7 +28,9 @@ import ( type FakeClientset struct{} -func (c *FakeClientset) GetPod(_ context.Context, namespace, name string) (*corev1.Pod, error) { +func (c *FakeClientset) ConfigurePodLister(_ string) {} + +func (c *FakeClientset) GetPod(namespace, name string) (*corev1.Pod, error) { config := webhook.FakeConfig() pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/csi_driver/node.go b/pkg/csi_driver/node.go index 91802be2..de11ebb6 100644 --- a/pkg/csi_driver/node.go +++ b/pkg/csi_driver/node.go @@ -166,7 +166,7 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish } // Check if the sidecar container was injected into the Pod - pod, err := s.k8sClients.GetPod(ctx, vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName]) + pod, err := s.k8sClients.GetPod(vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName]) if err != nil { return nil, status.Errorf(codes.NotFound, "failed to get pod: %v", err) } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 7c46233f..2ba6efbf 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -57,7 +57,7 @@ var _ = func() bool { flag.Parse() framework.AfterReadingAllFlags(&framework.TestContext) - c, err = clientset.New(framework.TestContext.KubeConfig) + c, err = clientset.New(framework.TestContext.KubeConfig, 0) if err != nil { klog.Fatalf("Failed to configure k8s client: %v", err) }