Skip to content

Commit

Permalink
feat: Watch configMap for deletion (#12)
Browse files Browse the repository at this point in the history
Signed-off-by: Heba Elayoty <hebaelayoty@gmail.com>
  • Loading branch information
helayoty committed Apr 5, 2023
1 parent cc1c010 commit 3e8dd74
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 43 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ _output

.idea

/hack/tools/bin/
/hack/tools/bin/
.DS_Store
4 changes: 1 addition & 3 deletions charts/carbon-intensity-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ apiServer:
password: password
image:
repository: ghcr.io/azure/kubernetes-carbon-intensity-exporter/server
pullPolicy: Always
pullPolicy: IfNotPresent
tag: "0.1.0"
carbonDataExporter:
name: carbon-data-exporter
Expand All @@ -32,11 +32,9 @@ rbac:
serviceAccountName: carbon-aware-sa
roleRef: cluster-admin

imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
podAnnotations: {}
podSecurityContext: {}
nodeSelector: {}
tolerations: []
affinity: {}
8 changes: 4 additions & 4 deletions cmd/exporter/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

var (
//exporter command args
configmapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'")
configMapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'")
patrolInterval = flag.String("patrol-interval", "12h", "Patrol interval in hours - Default every 12 hours")
region = flag.String("region", "", "Region to get carbon intensity for - Required")
)
Expand All @@ -53,7 +53,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command {
if err != nil {
klog.Fatalf("unable to initialize command configs: %s", err.Error())
}
if err := Run(c.Complete(), *region, *patrolInterval, stopChan); err != nil {
if err := Run(c.Complete(), stopChan); err != nil {
klog.Fatalf("unable to execute command : %s", err.Error())
}
},
Expand Down Expand Up @@ -82,7 +82,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command {
return cmd
}

func Run(cc *exporterconfig.CompletedConfig, region string, patrolInterval string, stopCh <-chan struct{}) error {
func Run(cc *exporterconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Init client SDK and exporter
apiClient := client.NewAPIClient(client.NewConfiguration())
e, err := exporter.New(cc.ClusterClient, apiClient, cc.Recorder)
Expand Down Expand Up @@ -142,7 +142,7 @@ func startExporter(p *exporter.Exporter, stopCh <-chan struct{}) func(context.Co
}

return func(ctx context.Context) {
p.Run(ctx, *configmapName, *region, ptDuration, stopCh)
p.Run(ctx, *configMapName, *region, ptDuration, stopCh)
<-ctx.Done()
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/antihax/optional v1.0.0
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
golang.org/x/net v0.7.0
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b
k8s.io/api v0.26.2
Expand Down Expand Up @@ -60,7 +61,6 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
Expand Down
5 changes: 2 additions & 3 deletions hack/e2e/manual-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ error() {
TMPDIR=""

cleanup() {
helm uninstall carbon-e2e
kind delete clusters -n carbon-e2e
kind delete cluster -n carbon-e2e
if [ -n "$TMPDIR" ]; then
rm -rf "$TMPDIR"
fi
Expand Down Expand Up @@ -60,7 +59,7 @@ sleep 10

kubectl delete configmap carbon-intensity -n kube-system

sleep 10
sleep 15

kubectl get configmap carbon-intensity -n kube-system
kubectl describe configmap carbon-intensity -n kube-system
Expand Down
51 changes: 34 additions & 17 deletions pkg/exporter/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
)

var (
isImmutable = true
)

func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName string, emissionForecast []client.EmissionsForecastDto) error {
func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName string, emissionForecast []client.EmissionsForecastDto) error {
if emissionForecast == nil {
return errors.New("emission forecast cannot be nil")
}
Expand All @@ -42,7 +43,7 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st

configMap := &corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: configmapName,
Name: configMapName,
Namespace: client.Namespace,
},
Immutable: &isImmutable,
Expand All @@ -59,31 +60,47 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st
},
}

currentConfig, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configmapName, v1.GetOptions{})
_, err = e.clusterClient.CoreV1().
ConfigMaps(client.Namespace).
Create(ctx, configMap, v1.CreateOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("configmap %s is not found", configmapName)
} else {
return err
}
return err
}
klog.Infof("configMap %s has been created", configMapName)

return nil
}

// Delete the old configmap if any.
if currentConfig != nil && currentConfig.Name != "" || !apierrors.IsNotFound(err) {
// Delete it first (as it is immutable)
klog.Info("deleting current the configmap")
err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Delete(ctx, configmapName, v1.DeleteOptions{})
if err != nil {
return err
func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) watch.Interface {
watch, err := e.clusterClient.CoreV1().
ConfigMaps(client.Namespace).
Watch(ctx, v1.ListOptions{
FieldSelector: "metadata.name=" + configMapName,
})
if err != nil {
klog.Fatalf("unable to watch configMap %s, err: %v", configMapName, err)
}
return watch
}

func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) error {
_, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configMapName, v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) { // if configMap is not found, no errors will be returned.
return nil
}
return err
}

_, err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Create(ctx, configMap, v1.CreateOptions{})
err = e.clusterClient.CoreV1().
ConfigMaps(client.Namespace).
Delete(ctx, configMapName, v1.DeleteOptions{})
if err != nil {
klog.Errorf("unable to delete configMap %s", configMapName)
return err
}
klog.Infof("configmap %s has been Created", configmapName)

klog.Infof("configMap %s has been deleted", configMapName)
return nil
}

Expand Down
103 changes: 89 additions & 14 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ import (
"github.com/antihax/optional"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)

var (
constantBackoff = wait.Backoff{
Duration: 3 * time.Second,
Steps: 10,
}
)

type Exporter struct {
clusterClient clientset.Interface
apiClient *client.APIClient
Expand All @@ -32,36 +42,101 @@ func New(clusterClient clientset.Interface, apiClient *client.APIClient, recorde
return b, nil
}

func (e *Exporter) Run(ctx context.Context, configmapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) {
go wait.Until(func() {
e.Patrol(ctx, configmapName, region)
}, patrolInterval, stopChan)
}

func (e *Exporter) Patrol(ctx context.Context, configmapName, region string) {
forecast, err := e.getCurrentForecastData(ctx, region)
func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) {
// create configMap first time
err := e.RefreshData(ctx, configMapName, region, stopChan)
if err != nil {
return
}
err = e.CreateOrUpdateConfigMap(ctx, configmapName, forecast)
configMapWatch := e.GetGonfigMapWatch(ctx, configMapName)

// configMapWatch is reassigned when the config map is deleted,
// so we want to make sure to stop the last instance.
defer func() {
configMapWatch.Stop()
}()

refreshPatrol := time.NewTicker(patrolInterval)
defer refreshPatrol.Stop()

for {
select {
// if the configMap got deleted by user
case event := <-configMapWatch.ResultChan():
if event.Type == watch.Deleted {
err := e.RefreshData(ctx, configMapName, region, stopChan)
if err != nil {
return
}
// refresh watch after deletion
configMapWatch.Stop()
configMapWatch = e.GetGonfigMapWatch(ctx, configMapName)

e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeWarning, "Configmap Deleted", "Configmap got deleted")
}

// if refresh time elapsed
case <-refreshPatrol.C:
err := e.RefreshData(ctx, configMapName, region, stopChan)
if err != nil {
return
}

// refresh watch after deletion
configMapWatch.Stop()
configMapWatch = e.GetGonfigMapWatch(ctx, configMapName)

e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeNormal, "Configmap updated", "Configmap gets updated")

// context got canceled or done
case <-ctx.Done():
case <-stopChan:
return
}
}
}

func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region string, stopChan <-chan struct{}) error {
err := e.DeleteConfigmap(ctx, configMapName)
if err != nil && !apierrors.IsNotFound(err) { // if configMap is not found,
return err
}

retry.OnError(constantBackoff, func(err error) bool {
return true
}, func() error {
forecast, err := e.getCurrentForecastData(ctx, region, stopChan)
if err != nil {
return err
}
return e.CreateOrUpdateConfigMap(ctx, configMapName, forecast)
})
if err != nil {
e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeWarning, "Configmap Create", "Error while creating configmap")
klog.Errorf("an error has occurred while creating %s configmap, %s", configmapName, err.Error())
return
}, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap")
klog.Errorf("an error has occurred while creating %s configMap, err: %s", configMapName, err.Error())
return err
}
e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeNormal, "Exporter results", "Done retrieve data")

return nil
}

func (e *Exporter) getCurrentForecastData(ctx context.Context, region string) ([]client.EmissionsForecastDto, error) {
func (e *Exporter) getCurrentForecastData(ctx context.Context, region string, stopChan <-chan struct{}) ([]client.EmissionsForecastDto, error) {
opt := &client.CarbonAwareApiGetCurrentForecastDataOpts{
DataStartAt: optional.EmptyTime(),
DataEndAt: optional.EmptyTime(),
Expand Down

0 comments on commit 3e8dd74

Please sign in to comment.