Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Watch configMap for deletion #12

Merged
merged 3 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ _output

.idea

/hack/tools/bin/
/hack/tools/bin/
.DS_Store
4 changes: 1 addition & 3 deletions charts/carbon-intensity-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ apiServer:
password: password
image:
repository: ghcr.io/azure/kubernetes-carbon-intensity-exporter/server
pullPolicy: Always
pullPolicy: IfNotPresent
tag: "0.1.0"
carbonDataExporter:
name: carbon-data-exporter
Expand All @@ -32,11 +32,9 @@ rbac:
serviceAccountName: carbon-aware-sa
roleRef: cluster-admin

imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
podAnnotations: {}
podSecurityContext: {}
nodeSelector: {}
tolerations: []
affinity: {}
8 changes: 4 additions & 4 deletions cmd/exporter/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

var (
//exporter command args
configmapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'")
configMapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'")
patrolInterval = flag.String("patrol-interval", "12h", "Patrol interval in hours - Default every 12 hours")
region = flag.String("region", "", "Region to get carbon intensity for - Required")
)
Expand All @@ -53,7 +53,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command {
if err != nil {
klog.Fatalf("unable to initialize command configs: %s", err.Error())
}
if err := Run(c.Complete(), *region, *patrolInterval, stopChan); err != nil {
if err := Run(c.Complete(), stopChan); err != nil {
klog.Fatalf("unable to execute command : %s", err.Error())
}
},
Expand Down Expand Up @@ -82,7 +82,7 @@ func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command {
return cmd
}

func Run(cc *exporterconfig.CompletedConfig, region string, patrolInterval string, stopCh <-chan struct{}) error {
func Run(cc *exporterconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Init client SDK and exporter
apiClient := client.NewAPIClient(client.NewConfiguration())
e, err := exporter.New(cc.ClusterClient, apiClient, cc.Recorder)
Expand Down Expand Up @@ -142,7 +142,7 @@ func startExporter(p *exporter.Exporter, stopCh <-chan struct{}) func(context.Co
}

return func(ctx context.Context) {
p.Run(ctx, *configmapName, *region, ptDuration, stopCh)
p.Run(ctx, *configMapName, *region, ptDuration, stopCh)
<-ctx.Done()
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/antihax/optional v1.0.0
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
golang.org/x/net v0.7.0
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b
k8s.io/api v0.26.2
Expand Down Expand Up @@ -60,7 +61,6 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
Expand Down
5 changes: 2 additions & 3 deletions hack/e2e/manual-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ error() {
TMPDIR=""

cleanup() {
helm uninstall carbon-e2e
kind delete clusters -n carbon-e2e
kind delete cluster -n carbon-e2e
if [ -n "$TMPDIR" ]; then
rm -rf "$TMPDIR"
fi
Expand Down Expand Up @@ -60,7 +59,7 @@ sleep 10

kubectl delete configmap carbon-intensity -n kube-system

sleep 10
sleep 15

kubectl get configmap carbon-intensity -n kube-system
kubectl describe configmap carbon-intensity -n kube-system
Expand Down
51 changes: 34 additions & 17 deletions pkg/exporter/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
)

var (
isImmutable = true
)

func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName string, emissionForecast []client.EmissionsForecastDto) error {
func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configMapName string, emissionForecast []client.EmissionsForecastDto) error {
if emissionForecast == nil {
return errors.New("emission forecast cannot be nil")
}
Expand All @@ -42,7 +43,7 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st

configMap := &corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: configmapName,
Name: configMapName,
Namespace: client.Namespace,
},
Immutable: &isImmutable,
Expand All @@ -59,31 +60,47 @@ func (e *Exporter) CreateOrUpdateConfigMap(ctx context.Context, configmapName st
},
}

currentConfig, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configmapName, v1.GetOptions{})
_, err = e.clusterClient.CoreV1().
ConfigMaps(client.Namespace).
Create(ctx, configMap, v1.CreateOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("configmap %s is not found", configmapName)
} else {
return err
}
return err
}
klog.Infof("configMap %s has been created", configMapName)

return nil
}

// Delete the old configmap if any.
if currentConfig != nil && currentConfig.Name != "" || !apierrors.IsNotFound(err) {
// Delete it first (as it is immutable)
klog.Info("deleting current the configmap")
err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Delete(ctx, configmapName, v1.DeleteOptions{})
if err != nil {
return err
func (e *Exporter) GetGonfigMapWatch(ctx context.Context, configMapName string) watch.Interface {
watch, err := e.clusterClient.CoreV1().
ConfigMaps(client.Namespace).
Watch(ctx, v1.ListOptions{
FieldSelector: "metadata.name=" + configMapName,
})
if err != nil {
klog.Fatalf("unable to watch configMap %s, err: %v", configMapName, err)
}
return watch
}

func (e *Exporter) DeleteConfigmap(ctx context.Context, configMapName string) error {
_, err := e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Get(ctx, configMapName, v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) { // if configMap is not found, no errors will be returned.
return nil
}
return err
}

_, err = e.clusterClient.CoreV1().ConfigMaps(client.Namespace).Create(ctx, configMap, v1.CreateOptions{})
err = e.clusterClient.CoreV1().
ConfigMaps(client.Namespace).
Delete(ctx, configMapName, v1.DeleteOptions{})
if err != nil {
klog.Errorf("unable to delete configMap %s", configMapName)
return err
}
klog.Infof("configmap %s has been Created", configmapName)

klog.Infof("configMap %s has been deleted", configMapName)
return nil
}

Expand Down
103 changes: 89 additions & 14 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ import (
"github.com/antihax/optional"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)

var (
constantBackoff = wait.Backoff{
Duration: 3 * time.Second,
Steps: 10,
}
)

type Exporter struct {
clusterClient clientset.Interface
apiClient *client.APIClient
Expand All @@ -32,36 +42,101 @@ func New(clusterClient clientset.Interface, apiClient *client.APIClient, recorde
return b, nil
}

func (e *Exporter) Run(ctx context.Context, configmapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) {
go wait.Until(func() {
e.Patrol(ctx, configmapName, region)
}, patrolInterval, stopChan)
}

func (e *Exporter) Patrol(ctx context.Context, configmapName, region string) {
forecast, err := e.getCurrentForecastData(ctx, region)
func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) {
// create configMap first time
err := e.RefreshData(ctx, configMapName, region, stopChan)
if err != nil {
return
}
err = e.CreateOrUpdateConfigMap(ctx, configmapName, forecast)
configMapWatch := e.GetGonfigMapWatch(ctx, configMapName)

// configMapWatch is reassigned when the config map is deleted,
// so we want to make sure to stop the last instance.
defer func() {
configMapWatch.Stop()
}()

refreshPatrol := time.NewTicker(patrolInterval)
defer refreshPatrol.Stop()

for {
select {
// if the configMap got deleted by user
case event := <-configMapWatch.ResultChan():
if event.Type == watch.Deleted {
err := e.RefreshData(ctx, configMapName, region, stopChan)
if err != nil {
return
}
// refresh watch after deletion
configMapWatch.Stop()
configMapWatch = e.GetGonfigMapWatch(ctx, configMapName)

e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeWarning, "Configmap Deleted", "Configmap got deleted")
}

// if refresh time elapsed
case <-refreshPatrol.C:
err := e.RefreshData(ctx, configMapName, region, stopChan)
if err != nil {
return
}

// refresh watch after deletion
configMapWatch.Stop()
configMapWatch = e.GetGonfigMapWatch(ctx, configMapName)

e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeNormal, "Configmap updated", "Configmap gets updated")

// context got canceled or done
case <-ctx.Done():
case <-stopChan:
return
}
}
}

func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region string, stopChan <-chan struct{}) error {
err := e.DeleteConfigmap(ctx, configMapName)
if err != nil && !apierrors.IsNotFound(err) { // if configMap is not found,
return err
}

retry.OnError(constantBackoff, func(err error) bool {
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will returning true cause always retry?

Copy link
Contributor Author

@helayoty helayoty Apr 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. No specific error. We want to retry for all errors.

}, func() error {
forecast, err := e.getCurrentForecastData(ctx, region, stopChan)
if err != nil {
return err
}
return e.CreateOrUpdateConfigMap(ctx, configMapName, forecast)
})
if err != nil {
e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeWarning, "Configmap Create", "Error while creating configmap")
klog.Errorf("an error has occurred while creating %s configmap, %s", configmapName, err.Error())
return
}, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap")
klog.Errorf("an error has occurred while creating %s configMap, err: %s", configMapName, err.Error())
return err
}
e.recorder.Eventf(&corev1.ObjectReference{
Kind: "Pod",
Namespace: client.Namespace,
Name: client.PodName,
}, corev1.EventTypeNormal, "Exporter results", "Done retrieve data")

return nil
}

func (e *Exporter) getCurrentForecastData(ctx context.Context, region string) ([]client.EmissionsForecastDto, error) {
func (e *Exporter) getCurrentForecastData(ctx context.Context, region string, stopChan <-chan struct{}) ([]client.EmissionsForecastDto, error) {
opt := &client.CarbonAwareApiGetCurrentForecastDataOpts{
DataStartAt: optional.EmptyTime(),
DataEndAt: optional.EmptyTime(),
Expand Down