diff --git a/.gitignore b/.gitignore index 9a1377a..b45e7c4 100644 --- a/.gitignore +++ b/.gitignore @@ -29,4 +29,5 @@ _output .idea -/hack/tools/bin/ \ No newline at end of file +/hack/tools/bin/ +.DS_Store diff --git a/charts/carbon-intensity-exporter/values.yaml b/charts/carbon-intensity-exporter/values.yaml index abe0b04..52e21d0 100644 --- a/charts/carbon-intensity-exporter/values.yaml +++ b/charts/carbon-intensity-exporter/values.yaml @@ -11,7 +11,7 @@ apiServer: password: password image: repository: ghcr.io/azure/kubernetes-carbon-intensity-exporter/server - pullPolicy: Always + pullPolicy: IfNotPresent tag: "0.1.0" carbonDataExporter: name: carbon-data-exporter @@ -32,11 +32,9 @@ rbac: serviceAccountName: carbon-aware-sa roleRef: cluster-admin -imagePullSecrets: [] nameOverride: "" fullnameOverride: "" podAnnotations: {} -podSecurityContext: {} nodeSelector: {} tolerations: [] affinity: {} diff --git a/cmd/exporter/app/server.go b/cmd/exporter/app/server.go index ec61622..52a3030 100644 --- a/cmd/exporter/app/server.go +++ b/cmd/exporter/app/server.go @@ -29,7 +29,7 @@ import ( var ( //exporter command args - configmapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'") + configMapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'") patrolInterval = flag.String("patrol-interval", "12h", "Patrol interval in hours - Default every 12 hours") region = flag.String("region", "", "Region to get carbon intensity for - Required") ) @@ -53,7 +53,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command { if err != nil { klog.Fatalf("unable to initialize command configs: %s", err.Error()) } - if err := Run(c.Complete(), *region, *patrolInterval, stopChan); err != nil { + if err := Run(c.Complete(), stopChan); err != nil { klog.Fatalf("unable to execute command : %s", err.Error()) } }, @@ -82,7 +82,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command { return cmd } -func Run(cc *exporterconfig.CompletedConfig, region string, patrolInterval string, stopCh <-chan struct{}) error { +func Run(cc *exporterconfig.CompletedConfig, stopCh <-chan struct{}) error { // Init client SDK and exporter apiClient := client.NewAPIClient(client.NewConfiguration()) e, err := exporter.New(cc.ClusterClient, apiClient, cc.Recorder) @@ -142,7 +142,7 @@ func startExporter(p *exporter.Exporter, stopCh <-chan struct{}) func(context.Co } return func(ctx context.Context) { - p.Run(ctx, *configmapName, *region, ptDuration, stopCh) + p.Run(ctx, *configMapName, *region, ptDuration, stopCh) <-ctx.Done() } } diff --git a/go.mod b/go.mod index 0ce52ad..bc88074 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/antihax/optional v1.0.0 github.com/spf13/cobra v1.6.0 + github.com/spf13/pflag v1.0.5 golang.org/x/net v0.7.0 golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b k8s.io/api v0.26.2 @@ -60,7 +61,6 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect go.etcd.io/etcd/api/v3 v3.5.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect diff --git a/hack/e2e/manual-e2e-test.sh b/hack/e2e/manual-e2e-test.sh index c9e1596..62c896a 100755 --- a/hack/e2e/manual-e2e-test.sh +++ b/hack/e2e/manual-e2e-test.sh @@ -26,8 +26,7 @@ error() { TMPDIR="" cleanup() { - helm uninstall carbon-e2e - kind delete clusters -n carbon-e2e + kind delete cluster -n carbon-e2e if [ -n "$TMPDIR" ]; then rm -rf "$TMPDIR" fi @@ -60,7 +59,7 @@ sleep 10 kubectl delete configmap carbon-intensity -n kube-system -sleep 10 +sleep 15 kubectl get configmap carbon-intensity -n kube-system kubectl describe configmap carbon-intensity -n kube-system diff --git a/pkg/exporter/configmap.go b/pkg/exporter/configmap.go index acff8a2..c4a26f6 100644 --- a/pkg/exporter/configmap.go +++ b/pkg/exporter/configmap.go @@ -17,6 +17,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" ) @@ -24,7 +25,7 @@ var ( isImmutable = true ) -func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName string, emissionForecast []client.EmissionsForecastDto) error { +func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName string, emissionForecast []client.EmissionsForecastDto) error { if emissionForecast == nil { return errors.New("emission forecast cannot be nil") } @@ -42,7 +43,7 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st configMap := &corev1.ConfigMap{ ObjectMeta: v1.ObjectMeta{ - Name: configmapName, + Name: configMapName, Namespace: client.Namespace, }, Immutable: &isImmutable, @@ -59,31 +60,47 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st }, } - currentConfig, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configmapName, v1.GetOptions{}) + _, err = e.clusterClient.CoreV1(). + ConfigMaps(client.Namespace). + Create(ctx, configMap, v1.CreateOptions{}) if err != nil { - if apierrors.IsNotFound(err) { - klog.Infof("configmap %s is not found", configmapName) - } else { - return err - } + return err } + klog.Infof("configMap %s has been created", configMapName) + + return nil +} - // Delete the old configmap if any. - if currentConfig != nil && currentConfig.Name != "" || !apierrors.IsNotFound(err) { - // Delete it first (as it is immutable) - klog.Info("deleting current the configmap") - err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Delete(ctx, configmapName, v1.DeleteOptions{}) - if err != nil { - return err +func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) watch.Interface { + watch, err := e.clusterClient.CoreV1(). + ConfigMaps(client.Namespace). + Watch(ctx, v1.ListOptions{ + FieldSelector: "metadata.name=" + configMapName, + }) + if err != nil { + klog.Fatalf("unable to watch configMap %s, err: %v", configMapName, err) + } + return watch +} + +func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) error { + _, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configMapName, v1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { // if configMap is not found, no errors will be returned. + return nil } + return err } - _, err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Create(ctx, configMap, v1.CreateOptions{}) + err = e.clusterClient.CoreV1(). + ConfigMaps(client.Namespace). + Delete(ctx, configMapName, v1.DeleteOptions{}) if err != nil { + klog.Errorf("unable to delete configMap %s", configMapName) return err } - klog.Infof("configmap %s has been Created", configmapName) + klog.Infof("configMap %s has been deleted", configMapName) return nil } diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index 9311287..539ff84 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -11,12 +11,22 @@ import ( "github.com/antihax/optional" "golang.org/x/net/context" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" ) +var ( + constantBackoff = wait.Backoff{ + Duration: 3 * time.Second, + Steps: 10, + } +) + type Exporter struct { clusterClient clientset.Interface apiClient *client.APIClient @@ -32,36 +42,101 @@ func New(clusterClient clientset.Interface, apiClient *client.APIClient, recorde return b, nil } -func (e *Exporter) Run(ctx context.Context, configmapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) { - go wait.Until(func() { - e.Patrol(ctx, configmapName, region) - }, patrolInterval, stopChan) -} - -func (e *Exporter) Patrol(ctx context.Context, configmapName, region string) { - forecast, err := e.getCurrentForecastData(ctx, region) +func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) { + // create configMap first time + err := e.RefreshData(ctx, configMapName, region, stopChan) if err != nil { return } - err = e.CreateOrUpdateConfigMap(ctx, configmapName, forecast) + configMapWatch := e.GetGonfigMapWatch(ctx, configMapName) + + // configMapWatch is reassigned when the config map is deleted, + // so we want to make sure to stop the last instance. + defer func() { + configMapWatch.Stop() + }() + + refreshPatrol := time.NewTicker(patrolInterval) + defer refreshPatrol.Stop() + + for { + select { + // if the configMap got deleted by user + case event := <-configMapWatch.ResultChan(): + if event.Type == watch.Deleted { + err := e.RefreshData(ctx, configMapName, region, stopChan) + if err != nil { + return + } + // refresh watch after deletion + configMapWatch.Stop() + configMapWatch = e.GetGonfigMapWatch(ctx, configMapName) + + e.recorder.Eventf(&corev1.ObjectReference{ + Kind: "Pod", + Namespace: client.Namespace, + Name: client.PodName, + }, corev1.EventTypeWarning, "Configmap Deleted", "Configmap got deleted") + } + + // if refresh time elapsed + case <-refreshPatrol.C: + err := e.RefreshData(ctx, configMapName, region, stopChan) + if err != nil { + return + } + + // refresh watch after deletion + configMapWatch.Stop() + configMapWatch = e.GetGonfigMapWatch(ctx, configMapName) + + e.recorder.Eventf(&corev1.ObjectReference{ + Kind: "Pod", + Namespace: client.Namespace, + Name: client.PodName, + }, corev1.EventTypeNormal, "Configmap updated", "Configmap gets updated") + + // context got canceled or done + case <-ctx.Done(): + case <-stopChan: + return + } + } +} + +func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region string, stopChan <-chan struct{}) error { + err := e.DeleteConfigmap(ctx, configMapName) + if err != nil && !apierrors.IsNotFound(err) { // if configMap is not found, + return err + } + + retry.OnError(constantBackoff, func(err error) bool { + return true + }, func() error { + forecast, err := e.getCurrentForecastData(ctx, region, stopChan) + if err != nil { + return err + } + return e.CreateOrUpdateConfigMap(ctx, configMapName, forecast) + }) if err != nil { e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, - }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configmap") - klog.Errorf("an error has occurred while creating %s configmap, %s", configmapName, err.Error()) - return + }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap") + klog.Errorf("an error has occurred while creating %s configMap, err: %s", configMapName, err.Error()) + return err } e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeNormal, "Exporter results", "Done retrieve data") - + return nil } -func (e *Exporter) getCurrentForecastData(ctx context.Context, region string) ([]client.EmissionsForecastDto, error) { +func (e *Exporter) getCurrentForecastData(ctx context.Context, region string, stopChan <-chan struct{}) ([]client.EmissionsForecastDto, error) { opt := &client.CarbonAwareApiGetCurrentForecastDataOpts{ DataStartAt: optional.EmptyTime(), DataEndAt: optional.EmptyTime(),