Skip to content

Commit

Permalink
feat: configurable burst and QPS in k8s client
Browse files Browse the repository at this point in the history
  • Loading branch information
paullaffitte authored and npdgm committed Apr 22, 2024
1 parent cc3b0c0 commit e002b89
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 9 deletions.
13 changes: 12 additions & 1 deletion cmd/x509-certificate-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

_ "github.com/KimMachineGun/automemlimit"
_ "go.uber.org/automaxprocs"
"k8s.io/client-go/util/flowcontrol"

"github.com/enix/x509-certificate-exporter/v3/internal"
getopt "github.com/pborman/getopt/v2"
Expand Down Expand Up @@ -39,6 +40,9 @@ func main() {
maxCacheDuration := durationFlag(0)
getopt.FlagLong(&maxCacheDuration, "max-cache-duration", 0, "maximum cache duration for kube secrets. cache is per namespace and randomized to avoid massive requests.")

rateLimitQPS := getopt.IntLong("kube-api-rate-limit-qps", 0, 0, "Kubernetes API request rate limit")
rateLimitBurst := getopt.IntLong("kube-api-rate-limit-burst", 0, 0, "Kubernetes API request burst")

files := stringArrayFlag{}
getopt.FlagLong(&files, "watch-file", 'f', "watch one or more x509 certificate file")

Expand Down Expand Up @@ -131,7 +135,14 @@ func main() {
configpath = defaultKubeConfig
}

err := exporter.ConnectToKubernetesCluster(configpath)
// Set rate limiter only if both QPS and burst are set
var rateLimiter flowcontrol.RateLimiter
if *rateLimitQPS > 0 && *rateLimitBurst > 0 {
log.Infof("setting Kubernetes API rate limiter to %d QPS and %d burst", *rateLimitQPS, *rateLimitBurst)
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(*rateLimitQPS), *rateLimitBurst)
}

err := exporter.ConnectToKubernetesCluster(configpath, rateLimiter)
if err != nil {
log.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ spec:
{{- else }}
- --max-cache-duration=0
{{- end }}
{{- with .Values.secretsExporter.kubeApiRateLimits }}
- --kube-api-rate-limit-qps={{ .qps }}
- --kube-api-rate-limit-burst={{ .burst }}
{{- end }}
{{- if .Values.exposePerCertificateErrorMetrics }}
- --expose-per-cert-error-metrics
{{- end }}
Expand Down
5 changes: 5 additions & 0 deletions deploy/charts/x509-certificate-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ secretsExporter:
# -- Maximum time an object can stay in cache unrefreshed (seconds) - it will be at least half of that
maxDuration: 300

kubeApiRateLimits: {}
# -- Try higher values if querying secrets takes a long time because of throttling
# qps: 5
# burst: 10

# -- Additional environment variables for container
env: []
# - name: GOMAXPROCS
Expand Down
11 changes: 8 additions & 3 deletions internal/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
)

// ConnectToKubernetesCluster : Try connect to a cluster from inside if path is empty,
// otherwise try loading the kubeconfig at path "path"
func (exporter *Exporter) ConnectToKubernetesCluster(path string) error {
func (exporter *Exporter) ConnectToKubernetesCluster(path string, rateLimiter flowcontrol.RateLimiter) error {
var err error
exporter.kubeClient, err = connectToKubernetesCluster(path, false)
exporter.kubeClient, err = connectToKubernetesCluster(path, false, rateLimiter)
return err
}

Expand Down Expand Up @@ -232,7 +233,7 @@ func (exporter *Exporter) shrinkSecret(secret v1.Secret) v1.Secret {
return secret
}

func connectToKubernetesCluster(kubeconfigPath string, insecure bool) (*kubernetes.Clientset, error) {
func connectToKubernetesCluster(kubeconfigPath string, insecure bool, rateLimiter flowcontrol.RateLimiter) (*kubernetes.Clientset, error) {
config, err := parseKubeConfig(kubeconfigPath)
if err != nil {
return nil, err
Expand All @@ -243,6 +244,10 @@ func connectToKubernetesCluster(kubeconfigPath string, insecure bool) (*kubernet
config.TLSClientConfig.CAData = nil
}

if rateLimiter != nil {
config.RateLimiter = rateLimiter
}

return getKubeClient(config)
}

Expand Down
10 changes: 5 additions & 5 deletions internal/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
panic(err)
}

sharedKubeClient, err = connectToKubernetesCluster("kubeconfig", true)
sharedKubeClient, err = connectToKubernetesCluster("kubeconfig", true, nil)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestKubeMetricLabels(t *testing.T) {
}

func TestKubeNamespaceListFailure(t *testing.T) {
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter", true)
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter", true, nil)
if err != nil {
panic(err)
}
Expand All @@ -301,7 +301,7 @@ func TestKubeNamespaceListFailure(t *testing.T) {
}

func TestKubeSecretsListFailure(t *testing.T) {
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter-list", true)
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter-list", true, nil)
if err != nil {
panic(err)
}
Expand All @@ -316,7 +316,7 @@ func TestKubeSecretsListFailure(t *testing.T) {
}

func TestKubeInvalidConfig(t *testing.T) {
_, err := connectToKubernetesCluster("../test/kubeconfig-corrupted.yml", true)
_, err := connectToKubernetesCluster("../test/kubeconfig-corrupted.yml", true, nil)
assert.NotNil(t, err)
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func TestKubeEmptyStringKey(t *testing.T) {

func TestKubeConnectionFromInsideFailure(t *testing.T) {
e := &Exporter{}
err := e.ConnectToKubernetesCluster("")
err := e.ConnectToKubernetesCluster("", nil)
assert.NotNil(t, err)
}

Expand Down

0 comments on commit e002b89

Please sign in to comment.