From 46b563f65c456d3871b20adc5806fcb09e345514 Mon Sep 17 00:00:00 2001 From: Heba Elayoty Date: Tue, 4 Apr 2023 16:39:03 -0700 Subject: [PATCH 1/3] Watch configMap for deletion Signed-off-by: Heba Elayoty --- .gitignore | 3 +- cmd/exporter/app/server.go | 8 +-- go.mod | 2 +- pkg/exporter/configmap.go | 54 +++++++++++------- pkg/exporter/exporter.go | 110 +++++++++++++++++++++++++++++++------ 5 files changed, 135 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index 9a1377a..b45e7c4 100644 --- a/.gitignore +++ b/.gitignore @@ -29,4 +29,5 @@ _output .idea -/hack/tools/bin/ \ No newline at end of file +/hack/tools/bin/ +.DS_Store diff --git a/cmd/exporter/app/server.go b/cmd/exporter/app/server.go index ec61622..52a3030 100644 --- a/cmd/exporter/app/server.go +++ b/cmd/exporter/app/server.go @@ -29,7 +29,7 @@ import ( var ( //exporter command args - configmapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'") + configMapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'") patrolInterval = flag.String("patrol-interval", "12h", "Patrol interval in hours - Default every 12 hours") region = flag.String("region", "", "Region to get carbon intensity for - Required") ) @@ -53,7 +53,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command { if err != nil { klog.Fatalf("unable to initialize command configs: %s", err.Error()) } - if err := Run(c.Complete(), *region, *patrolInterval, stopChan); err != nil { + if err := Run(c.Complete(), stopChan); err != nil { klog.Fatalf("unable to execute command : %s", err.Error()) } }, @@ -82,7 +82,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command { return cmd } -func Run(cc *exporterconfig.CompletedConfig, region string, patrolInterval string, stopCh <-chan struct{}) error { +func Run(cc *exporterconfig.CompletedConfig, stopCh <-chan struct{}) error { // Init client SDK and exporter apiClient := client.NewAPIClient(client.NewConfiguration()) e, err := exporter.New(cc.ClusterClient, apiClient, cc.Recorder) @@ -142,7 +142,7 @@ func startExporter(p *exporter.Exporter, stopCh <-chan struct{}) func(context.Co } return func(ctx context.Context) { - p.Run(ctx, *configmapName, *region, ptDuration, stopCh) + p.Run(ctx, *configMapName, *region, ptDuration, stopCh) <-ctx.Done() } } diff --git a/go.mod b/go.mod index 0ce52ad..bc88074 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/antihax/optional v1.0.0 github.com/spf13/cobra v1.6.0 + github.com/spf13/pflag v1.0.5 golang.org/x/net v0.7.0 golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b k8s.io/api v0.26.2 @@ -60,7 +61,6 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect go.etcd.io/etcd/api/v3 v3.5.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect diff --git a/pkg/exporter/configmap.go b/pkg/exporter/configmap.go index acff8a2..fd34355 100644 --- a/pkg/exporter/configmap.go +++ b/pkg/exporter/configmap.go @@ -14,9 +14,9 @@ import ( "github.com/Azure/kubernetes-carbon-intensity-exporter/pkg/sdk/client" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" ) @@ -24,7 +24,7 @@ var ( isImmutable = true ) -func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName string, emissionForecast []client.EmissionsForecastDto) error { +func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName string, emissionForecast []client.EmissionsForecastDto) error { if emissionForecast == nil { return errors.New("emission forecast cannot be nil") } @@ -42,8 +42,8 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st configMap := &corev1.ConfigMap{ ObjectMeta: v1.ObjectMeta{ - Name: configmapName, - Namespace: client.Namespace, + Name: configMapName, + Namespace: namespace, }, Immutable: &isImmutable, Data: map[string]string{ @@ -59,31 +59,45 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st }, } - currentConfig, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configmapName, v1.GetOptions{}) + _, err = e.clusterClient.CoreV1(). + ConfigMaps(namespace). + Create(ctx, configMap, v1.CreateOptions{}) if err != nil { - if apierrors.IsNotFound(err) { - klog.Infof("configmap %s is not found", configmapName) - } else { - return err - } + return err + } + klog.Infof("configMap %s has been created", configMapName) + + return nil +} + +func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) watch.Interface { + watch, err := e.clusterClient.CoreV1(). + ConfigMaps(namespace). + Watch(ctx, v1.ListOptions{ + FieldSelector: "metadata.name=" + configMapName, + }) + if err != nil { + klog.Fatalf("unable to watch configMap %s, err: %v", configMapName, err) } + return watch +} - // Delete the old configmap if any. - if currentConfig != nil && currentConfig.Name != "" || !apierrors.IsNotFound(err) { - // Delete it first (as it is immutable) - klog.Info("deleting current the configmap") - err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Delete(ctx, configmapName, v1.DeleteOptions{}) - if err != nil { - return err - } +func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) error { + _, err := e.clusterClient.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, v1.GetOptions{}) + if err != nil { + klog.Errorf("unable to get configMap %s", configMapName) + return err } - _, err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Create(ctx, configMap, v1.CreateOptions{}) + err = e.clusterClient.CoreV1(). + ConfigMaps(namespace). + Delete(ctx, configMapName, v1.DeleteOptions{}) if err != nil { + klog.Errorf("unable to delete configMap %s", configMapName) return err } - klog.Infof("configmap %s has been Created", configmapName) + klog.Infof("configMap %s has been deleted", configMapName) return nil } diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index 9311287..9eae8d6 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -11,9 +11,11 @@ import ( "github.com/antihax/optional" "golang.org/x/net/context" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" ) @@ -32,36 +34,112 @@ func New(clusterClient clientset.Interface, apiClient *client.APIClient, recorde return b, nil } -func (e *Exporter) Run(ctx context.Context, configmapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) { - go wait.Until(func() { - e.Patrol(ctx, configmapName, region) - }, patrolInterval, stopChan) +func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) { + // create configMap first time + err := e.RefreshData(ctx, configMapName, region, stopChan) + if err != nil { + return + } + configMapWatch := e.GetGonfigMapWatch(ctx, configMapName) + + // configMapWatch is reassigned when the config map is deleted, + // so we want to make sure to stop the last instance. + defer func() { + configMapWatch.Stop() + }() + + refreshPatrol := time.NewTicker(patrolInterval) + defer refreshPatrol.Stop() + + for { + select { + // if the configMap got deleted by user + case event := <-configMapWatch.ResultChan(): + if event.Type == watch.Deleted { + err := e.Patrol(ctx, configMapName, region, stopChan) + if err != nil { + return + } + + configMapWatch.Stop() + // refresh watch after deletion + configMapWatch = e.GetGonfigMapWatch(ctx, configMapName) + + e.recorder.Eventf(&corev1.ObjectReference{ + Kind: "Pod", + Namespace: namespace, + Name: podName, + }, corev1.EventTypeWarning, "Configmap Deleted", "Configmap got deleted") + } + + // if refresh time elapsed + case <-refreshPatrol.C: + var err error + retry.OnError(retry.DefaultBackoff, func(err error) bool { + return true + }, func() error { + err = e.RefreshData(ctx, configMapName, region, stopChan) + return err + }) + if err != nil { + return + } + + configMapWatch.Stop() + // refresh watch after deletion + configMapWatch = e.GetGonfigMapWatch(ctx, configMapName) + + e.recorder.Eventf(&corev1.ObjectReference{ + Kind: "Pod", + Namespace: namespace, + Name: podName, + }, corev1.EventTypeNormal, "Configmap updated", "Configmap gets updated") + + // context got canceled or done + case <-ctx.Done(): + case <-stopChan: + return + } + } } -func (e *Exporter) Patrol(ctx context.Context, configmapName, region string) { - forecast, err := e.getCurrentForecastData(ctx, region) +func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region string, stopChan <-chan struct{}) error { + err := e.DeleteConfigmap(ctx, configMapName) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + err = e.Patrol(ctx, configMapName, region, stopChan) if err != nil { - return + return err } - err = e.CreateOrUpdateConfigMap(ctx, configmapName, forecast) + return nil +} + +func (e *Exporter) Patrol(ctx context.Context, configMapName, region string, stopChan <-chan struct{}) error { + forecast, err := e.getCurrentForecastData(ctx, region, stopChan) + if err != nil { + return err + } + + err = e.CreateOrUpdateConfigMap(ctx, configMapName, forecast) if err != nil { e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", - Namespace: client.Namespace, - Name: client.PodName, - }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configmap") - klog.Errorf("an error has occurred while creating %s configmap, %s", configmapName, err.Error()) - return + Namespace: namespace, + Name: podName, + }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap") + klog.Errorf("an error has occurred while creating %s configMap", configMapName) + return err } e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeNormal, "Exporter results", "Done retrieve data") - + return nil } -func (e *Exporter) getCurrentForecastData(ctx context.Context, region string) ([]client.EmissionsForecastDto, error) { +func (e *Exporter) getCurrentForecastData(ctx context.Context, region string, stopChan <-chan struct{}) ([]client.EmissionsForecastDto, error) { opt := &client.CarbonAwareApiGetCurrentForecastDataOpts{ DataStartAt: optional.EmptyTime(), DataEndAt: optional.EmptyTime(), From fd2b111d9e33d73007eb116941903e51abc0946c Mon Sep 17 00:00:00 2001 From: Heba Elayoty Date: Tue, 4 Apr 2023 18:05:53 -0700 Subject: [PATCH 2/3] Code review comments Signed-off-by: Heba Elayoty --- pkg/exporter/configmap.go | 5 ++++- pkg/exporter/exporter.go | 41 ++++++++++++++------------------------- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/pkg/exporter/configmap.go b/pkg/exporter/configmap.go index fd34355..373dfe1 100644 --- a/pkg/exporter/configmap.go +++ b/pkg/exporter/configmap.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/kubernetes-carbon-intensity-exporter/pkg/sdk/client" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/watch" @@ -85,7 +86,9 @@ func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) error { _, err := e.clusterClient.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, v1.GetOptions{}) if err != nil { - klog.Errorf("unable to get configMap %s", configMapName) + if apierrors.IsNotFound(err) { // if configMap is not found, no errors will be returned. + return nil + } return err } diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index 9eae8d6..2976305 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -56,13 +56,12 @@ func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrol // if the configMap got deleted by user case event := <-configMapWatch.ResultChan(): if event.Type == watch.Deleted { - err := e.Patrol(ctx, configMapName, region, stopChan) + err := e.RefreshData(ctx, configMapName, region, stopChan) if err != nil { return } - - configMapWatch.Stop() // refresh watch after deletion + configMapWatch.Stop() configMapWatch = e.GetGonfigMapWatch(ctx, configMapName) e.recorder.Eventf(&corev1.ObjectReference{ @@ -74,19 +73,13 @@ func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrol // if refresh time elapsed case <-refreshPatrol.C: - var err error - retry.OnError(retry.DefaultBackoff, func(err error) bool { - return true - }, func() error { - err = e.RefreshData(ctx, configMapName, region, stopChan) - return err - }) + err := e.RefreshData(ctx, configMapName, region, stopChan) if err != nil { return } - configMapWatch.Stop() // refresh watch after deletion + configMapWatch.Stop() configMapWatch = e.GetGonfigMapWatch(ctx, configMapName) e.recorder.Eventf(&corev1.ObjectReference{ @@ -105,30 +98,26 @@ func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrol func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region string, stopChan <-chan struct{}) error { err := e.DeleteConfigmap(ctx, configMapName) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - err = e.Patrol(ctx, configMapName, region, stopChan) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { // if configMap is not found, return err } - return nil -} -func (e *Exporter) Patrol(ctx context.Context, configMapName, region string, stopChan <-chan struct{}) error { - forecast, err := e.getCurrentForecastData(ctx, region, stopChan) - if err != nil { - return err - } - - err = e.CreateOrUpdateConfigMap(ctx, configMapName, forecast) + retry.OnError(retry.DefaultBackoff, func(err error) bool { + return true + }, func() error { + forecast, err := e.getCurrentForecastData(ctx, region, stopChan) + if err != nil { + return err + } + return e.CreateOrUpdateConfigMap(ctx, configMapName, forecast) + }) if err != nil { e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: namespace, Name: podName, }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap") - klog.Errorf("an error has occurred while creating %s configMap", configMapName) + klog.Errorf("an error has occurred while creating %s configMap, err: %s", configMapName, err.Error()) return err } e.recorder.Eventf(&corev1.ObjectReference{ From 046cf41dacf51339c0291b6ead4f415e5529be7e Mon Sep 17 00:00:00 2001 From: Heba Elayoty Date: Tue, 4 Apr 2023 21:15:51 -0700 Subject: [PATCH 3/3] use constant backoff Signed-off-by: Heba Elayoty --- charts/carbon-intensity-exporter/values.yaml | 4 +--- hack/e2e/manual-e2e-test.sh | 5 ++--- pkg/exporter/configmap.go | 10 ++++----- pkg/exporter/exporter.go | 22 +++++++++++++------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/charts/carbon-intensity-exporter/values.yaml b/charts/carbon-intensity-exporter/values.yaml index abe0b04..52e21d0 100644 --- a/charts/carbon-intensity-exporter/values.yaml +++ b/charts/carbon-intensity-exporter/values.yaml @@ -11,7 +11,7 @@ apiServer: password: password image: repository: ghcr.io/azure/kubernetes-carbon-intensity-exporter/server - pullPolicy: Always + pullPolicy: IfNotPresent tag: "0.1.0" carbonDataExporter: name: carbon-data-exporter @@ -32,11 +32,9 @@ rbac: serviceAccountName: carbon-aware-sa roleRef: cluster-admin -imagePullSecrets: [] nameOverride: "" fullnameOverride: "" podAnnotations: {} -podSecurityContext: {} nodeSelector: {} tolerations: [] affinity: {} diff --git a/hack/e2e/manual-e2e-test.sh b/hack/e2e/manual-e2e-test.sh index c9e1596..62c896a 100755 --- a/hack/e2e/manual-e2e-test.sh +++ b/hack/e2e/manual-e2e-test.sh @@ -26,8 +26,7 @@ error() { TMPDIR="" cleanup() { - helm uninstall carbon-e2e - kind delete clusters -n carbon-e2e + kind delete cluster -n carbon-e2e if [ -n "$TMPDIR" ]; then rm -rf "$TMPDIR" fi @@ -60,7 +59,7 @@ sleep 10 kubectl delete configmap carbon-intensity -n kube-system -sleep 10 +sleep 15 kubectl get configmap carbon-intensity -n kube-system kubectl describe configmap carbon-intensity -n kube-system diff --git a/pkg/exporter/configmap.go b/pkg/exporter/configmap.go index 373dfe1..c4a26f6 100644 --- a/pkg/exporter/configmap.go +++ b/pkg/exporter/configmap.go @@ -44,7 +44,7 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName st configMap := &corev1.ConfigMap{ ObjectMeta: v1.ObjectMeta{ Name: configMapName, - Namespace: namespace, + Namespace: client.Namespace, }, Immutable: &isImmutable, Data: map[string]string{ @@ -61,7 +61,7 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName st } _, err = e.clusterClient.CoreV1(). - ConfigMaps(namespace). + ConfigMaps(client.Namespace). Create(ctx, configMap, v1.CreateOptions{}) if err != nil { return err @@ -73,7 +73,7 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName st func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) watch.Interface { watch, err := e.clusterClient.CoreV1(). - ConfigMaps(namespace). + ConfigMaps(client.Namespace). Watch(ctx, v1.ListOptions{ FieldSelector: "metadata.name=" + configMapName, }) @@ -84,7 +84,7 @@ func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) } func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) error { - _, err := e.clusterClient.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, v1.GetOptions{}) + _, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configMapName, v1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { // if configMap is not found, no errors will be returned. return nil @@ -93,7 +93,7 @@ func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) er } err = e.clusterClient.CoreV1(). - ConfigMaps(namespace). + ConfigMaps(client.Namespace). Delete(ctx, configMapName, v1.DeleteOptions{}) if err != nil { klog.Errorf("unable to delete configMap %s", configMapName) diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index 2976305..539ff84 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -12,6 +12,7 @@ import ( "golang.org/x/net/context" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -19,6 +20,13 @@ import ( "k8s.io/klog/v2" ) +var ( + constantBackoff = wait.Backoff{ + Duration: 3 * time.Second, + Steps: 10, + } +) + type Exporter struct { clusterClient clientset.Interface apiClient *client.APIClient @@ -66,8 +74,8 @@ func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrol e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", - Namespace: namespace, - Name: podName, + Namespace: client.Namespace, + Name: client.PodName, }, corev1.EventTypeWarning, "Configmap Deleted", "Configmap got deleted") } @@ -84,8 +92,8 @@ func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrol e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", - Namespace: namespace, - Name: podName, + Namespace: client.Namespace, + Name: client.PodName, }, corev1.EventTypeNormal, "Configmap updated", "Configmap gets updated") // context got canceled or done @@ -102,7 +110,7 @@ func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region return err } - retry.OnError(retry.DefaultBackoff, func(err error) bool { + retry.OnError(constantBackoff, func(err error) bool { return true }, func() error { forecast, err := e.getCurrentForecastData(ctx, region, stopChan) @@ -114,8 +122,8 @@ func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region if err != nil { e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", - Namespace: namespace, - Name: podName, + Namespace: client.Namespace, + Name: client.PodName, }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap") klog.Errorf("an error has occurred while creating %s configMap, err: %s", configMapName, err.Error()) return err