Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
FROM --platform=${TARGETPLATFORM:-linux/amd64} flant/jq:b6be13d5-musl as libjq

# Go builder.
FROM --platform=${TARGETPLATFORM:-linux/amd64} golang:1.19-alpine3.16 AS builder
FROM --platform=${TARGETPLATFORM:-linux/amd64} golang:1.20-alpine3.16 AS builder

ARG appVersion=latest
RUN apk --no-cache add git ca-certificates gcc musl-dev libc-dev binutils-gold
Expand All @@ -18,7 +18,7 @@ ADD . /app
# Clone shell-operator to get frameworks
RUN git clone https://github.com/flant/shell-operator shell-operator-clone && \
cd shell-operator-clone && \
git checkout v1.1.3
git checkout v1.4.10

RUN shellOpVer=$(go list -m all | grep shell-operator | cut -d' ' -f 2-) \
CGO_ENABLED=1 \
Expand Down
4 changes: 0 additions & 4 deletions cmd/addon-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"math/rand"
"os"
"strings"
"syscall"
Expand Down Expand Up @@ -66,9 +65,6 @@ func main() {
func start(_ *kingpin.ParseContext) error {
sh_app.AppStartMessage = fmt.Sprintf("%s %s, shell-operator %s", app.AppName, app.Version, sh_app.Version)

// Init rand generator.
rand.Seed(time.Now().UnixNano())

ctx := context.Background()

operator := addon_operator.NewAddonOperator(ctx)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/flant/addon-operator

go 1.19
go 1.20

require (
github.com/davecgh/go-spew v1.1.1
Expand Down
22 changes: 14 additions & 8 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,28 +1091,34 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
default:
}

case absentResourcesEvent := <-op.HelmResourcesManager.Ch():
case HelmReleaseStatusEvent := <-op.HelmResourcesManager.Ch():
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
"module": absentResourcesEvent.ModuleName,
"module": HelmReleaseStatusEvent.ModuleName,
}
eventLogEntry := logEntry.WithFields(utils.LabelsToLogFields(logLabels))

// Do not add ModuleRun task if it is already queued.
hasTask := QueueHasPendingModuleRunTask(op.engine.TaskQueues.GetMain(), absentResourcesEvent.ModuleName)
hasTask := QueueHasPendingModuleRunTask(op.engine.TaskQueues.GetMain(), HelmReleaseStatusEvent.ModuleName)
eventDescription := "AbsentHelmResourcesDetected"
additionalDescription := fmt.Sprintf("%d absent module resources", len(HelmReleaseStatusEvent.Absent))
if HelmReleaseStatusEvent.UnexpectedStatus {
eventDescription = "HelmReleaseUnexpectedStatus"
additionalDescription = "unexpected helm release status"
}

if !hasTask {
newTask := sh_task.NewTask(task.ModuleRun).
WithLogLabels(logLabels).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: "DetectAbsentHelmResources",
ModuleName: absentResourcesEvent.ModuleName,
EventDescription: eventDescription,
ModuleName: HelmReleaseStatusEvent.ModuleName,
})
op.engine.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
taskAddDescription := fmt.Sprintf("got %d absent module resources, append", len(absentResourcesEvent.Absent))
op.logTaskAdd(logEntry, taskAddDescription, newTask)
op.logTaskAdd(logEntry, fmt.Sprintf("detected %s, append", additionalDescription), newTask)
} else {
eventLogEntry.WithField("task.flow", "noop").Infof("Got %d absent module resources, ModuleRun task already queued", len(absentResourcesEvent.Absent))
eventLogEntry.WithField("task.flow", "noop").Infof("Detected %s, ModuleRun task already queued", additionalDescription)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/helm/helm3/helm3.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (h *Helm3Client) DeleteRelease(releaseName string) (err error) {
return fmt.Errorf("helm uninstall %s invocation error: %v\n%v %v", releaseName, err, stdout, stderr)
}

h.LogEntry.Debugf("helm release %s deleted", releaseName)
return
}

Expand Down
38 changes: 36 additions & 2 deletions pkg/helm/helm3lib/helm3lib.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package helm3lib

import (
"context"
"fmt"
"os"
"sort"
Expand All @@ -15,7 +16,10 @@ import (
"helm.sh/helm/v3/pkg/cli"
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/releaseutil"
"helm.sh/helm/v3/pkg/storage"
"helm.sh/helm/v3/pkg/storage/driver"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -141,7 +145,7 @@ func (h *LibClient) UpgradeRelease(releaseName string, chartName string, valuesP
}
return h.upgradeRelease(releaseName, chartName, valuesPaths, setValues, namespace)
}

h.LogEntry.Debugf("helm release %s upgraded", releaseName)
return nil
}

Expand Down Expand Up @@ -209,7 +213,34 @@ func (h *LibClient) upgradeRelease(releaseName string, chartName string, valuesP
nsReleaseName := fmt.Sprintf("%s/%s", latestRelease.Namespace, latestRelease.Name)
h.LogEntry.Debugf("Latest release '%s': revision: %d has status: %s", nsReleaseName, latestRelease.Version, latestRelease.Info.Status)
if latestRelease.Info.Status.IsPending() {
h.rollbackLatestRelease(releases)
objectName := fmt.Sprintf("%s.%s.v%d", storage.HelmStorageType, latestRelease.Name, latestRelease.Version)
kubeClient, err := actionConfig.KubernetesClientSet()
if err != nil {
return fmt.Errorf("couldn't get kubernetes client set: %w", err)
}
// switch between storage types (memory, sql, secrets, configmaps) - with secrets and configmaps we can deal a bit more straightforward than doing a rollback
switch actionConfig.Releases.Name() {
case driver.ConfigMapsDriverName:
h.LogEntry.Debugf("ConfigMap for helm revision %d of release %s in status %s, driver %s: will be deleted", latestRelease.Version, nsReleaseName, latestRelease.Info.Status, driver.ConfigMapsDriverName)
err := kubeClient.CoreV1().ConfigMaps(latestRelease.Namespace).Delete(context.TODO(), objectName, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("couldn't delete configmap %s of release %s: %w", objectName, nsReleaseName, err)
}
h.LogEntry.Debugf("ConfigMap %s was deleted", objectName)

case driver.SecretsDriverName:
h.LogEntry.Debugf("Secret for helm revision %d of release %s in status %s, driver %s: will be deleted", latestRelease.Version, nsReleaseName, latestRelease.Info.Status, driver.SecretsDriverName)
err := kubeClient.CoreV1().Secrets(latestRelease.Namespace).Delete(context.TODO(), objectName, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("couldn't delete secret %s of release %s: %w", objectName, nsReleaseName, err)
}
h.LogEntry.Debugf("Secret %s was deleted", objectName)

default:
// memory and sql storages a bit more trickier - doing a rollback is justified
h.LogEntry.Debugf("Helm revision %d of release %s in status %s, driver %s: will be rolledback", latestRelease.Version, nsReleaseName, latestRelease.Info.Status, actionConfig.Releases.Name())
h.rollbackLatestRelease(releases)
}
}
}

Expand Down Expand Up @@ -271,6 +302,7 @@ func (h *LibClient) DeleteRelease(releaseName string) error {
return fmt.Errorf("helm uninstall %s invocation error: %v\n", releaseName, err)
}

h.LogEntry.Debugf("helm release %s deleted", releaseName)
return nil
}

Expand All @@ -288,6 +320,8 @@ func (h *LibClient) IsReleaseExists(releaseName string) (bool, error) {
// ListReleasesNames returns list of release names.
func (h *LibClient) ListReleasesNames() ([]string, error) {
l := action.NewList(actionConfig)
// list all releases regardless of their state
l.StateMask = action.ListAll
list, err := l.Run()
if err != nil {
return nil, fmt.Errorf("helm list failed: %s", err)
Expand Down
22 changes: 12 additions & 10 deletions pkg/helm_resources_manager/helm_resources_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ type HelmResourcesManager interface {
StopMonitors()
PauseMonitors()
ResumeMonitors()
StartMonitor(moduleName string, manifests []manifest.Manifest, defaultNamespace string)
StartMonitor(moduleName string, manifests []manifest.Manifest, defaultNamespace string, LastReleaseStatus func(releaseName string) (revision string, status string, err error))
HasMonitor(moduleName string) bool
StopMonitor(moduleName string)
PauseMonitor(moduleName string)
ResumeMonitor(moduleName string)
AbsentResources(moduleName string) ([]manifest.Manifest, error)
GetMonitor(moduleName string) *ResourcesMonitor
GetAbsentResources(templates []manifest.Manifest, defaultNamespace string) ([]manifest.Manifest, error)
Ch() chan AbsentResourcesEvent
Ch() chan ReleaseStatusEvent
}

type helmResourcesManager struct {
Expand All @@ -39,14 +39,14 @@ type helmResourcesManager struct {

monitors map[string]*ResourcesMonitor

eventCh chan AbsentResourcesEvent
eventCh chan ReleaseStatusEvent
}

var _ HelmResourcesManager = &helmResourcesManager{}

func NewHelmResourcesManager() HelmResourcesManager {
return &helmResourcesManager{
eventCh: make(chan AbsentResourcesEvent),
eventCh: make(chan ReleaseStatusEvent),
monitors: make(map[string]*ResourcesMonitor),
}
}
Expand All @@ -69,11 +69,11 @@ func (hm *helmResourcesManager) Stop() {
}
}

func (hm *helmResourcesManager) Ch() chan AbsentResourcesEvent {
func (hm *helmResourcesManager) Ch() chan ReleaseStatusEvent {
return hm.eventCh
}

func (hm *helmResourcesManager) StartMonitor(moduleName string, manifests []manifest.Manifest, defaultNamespace string) {
func (hm *helmResourcesManager) StartMonitor(moduleName string, manifests []manifest.Manifest, defaultNamespace string, lastReleaseStatus func(releaseName string) (revision string, status string, err error)) {
log.Debugf("Start helm resources monitor for '%s'", moduleName)
hm.StopMonitor(moduleName)

Expand All @@ -83,20 +83,22 @@ func (hm *helmResourcesManager) StartMonitor(moduleName string, manifests []mani
rm.WithModuleName(moduleName)
rm.WithManifests(manifests)
rm.WithDefaultNamespace(defaultNamespace)
rm.WithStatusGetter(lastReleaseStatus)
rm.WithAbsentCb(hm.absentResourcesCallback)

hm.monitors[moduleName] = rm
rm.Start()
}

func (hm *helmResourcesManager) absentResourcesCallback(moduleName string, absent []manifest.Manifest, defaultNs string) {
func (hm *helmResourcesManager) absentResourcesCallback(moduleName string, unexpectedStatus bool, absent []manifest.Manifest, defaultNs string) {
log.Debugf("Detect absent resources for %s", moduleName)
for _, m := range absent {
log.Debugf("%s/%s/%s", m.Namespace(defaultNs), m.Kind(), m.Name())
}
hm.eventCh <- AbsentResourcesEvent{
ModuleName: moduleName,
Absent: absent,
hm.eventCh <- ReleaseStatusEvent{
ModuleName: moduleName,
Absent: absent,
UnexpectedStatus: unexpectedStatus,
}
}

Expand Down
39 changes: 35 additions & 4 deletions pkg/helm_resources_manager/resources_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ type ResourcesMonitor struct {
kubeClient *klient.Client
logLabels map[string]string

absentCb func(moduleName string, absent []manifest.Manifest, defaultNs string)
absentCb func(moduleName string, unexpectedStatus bool, absent []manifest.Manifest, defaultNs string)

helmStatusGetter func(releaseName string) (revision string, status string, err error)
}

func NewResourcesMonitor() *ResourcesMonitor {
Expand Down Expand Up @@ -73,11 +75,15 @@ func (r *ResourcesMonitor) WithManifests(manifests []manifest.Manifest) {
r.manifests = manifests
}

func (r *ResourcesMonitor) WithAbsentCb(cb func(string, []manifest.Manifest, string)) {
func (r *ResourcesMonitor) WithAbsentCb(cb func(string, bool, []manifest.Manifest, string)) {
r.absentCb = cb
}

// Start creates a timer and check if all manifests are present in cluster.
func (r *ResourcesMonitor) WithStatusGetter(lastReleaseStatus func(releaseName string) (revision string, status string, err error)) {
r.helmStatusGetter = lastReleaseStatus
}

// Start creates a timer and check if all deployed manifests are present in the cluster.
func (r *ResourcesMonitor) Start() {
logEntry := log.WithFields(utils.LabelsToLogFields(r.logLabels)).
WithField("operator.component", "HelmResourceMonitor")
Expand All @@ -92,6 +98,19 @@ func (r *ResourcesMonitor) Start() {
if r.paused {
continue
}
// Check release status
status, err := r.GetHelmReleaseStatus(r.moduleName)
if err != nil {
logEntry.Errorf("Cannot get helm release status: %s", err)
}

if status != "deployed" {
logEntry.Debugf("Helm release %s is in unexpected status: %s", r.moduleName, status)
if r.absentCb != nil {
r.absentCb(r.moduleName, true, []manifest.Manifest{}, r.defaultNamespace)
}
}

// Check resources
absent, err := r.AbsentResources()
if err != nil {
Expand All @@ -101,7 +120,7 @@ func (r *ResourcesMonitor) Start() {
if len(absent) > 0 {
logEntry.Debug("Absent resources detected")
if r.absentCb != nil {
r.absentCb(r.moduleName, absent, r.defaultNamespace)
r.absentCb(r.moduleName, false, absent, r.defaultNamespace)
}
} else {
logEntry.Debug("No absent resources detected")
Expand All @@ -115,6 +134,18 @@ func (r *ResourcesMonitor) Start() {
}()
}

// GetHelmReleaseStatus returns last release status
func (r *ResourcesMonitor) GetHelmReleaseStatus(moduleName string) (string, error) {
logEntry := log.WithFields(utils.LabelsToLogFields(r.logLabels)).
WithField("operator.component", "HelmResourceMonitor")
revision, status, err := r.helmStatusGetter(moduleName)
if err != nil {
return "", err
}
logEntry.Debugf("Helm release %s, revision %s, status: %s", moduleName, revision, status)
return status, nil
}

// Pause prevent execution of absent callback
func (r *ResourcesMonitor) Pause() {
r.paused = true
Expand Down
4 changes: 2 additions & 2 deletions pkg/helm_resources_manager/test/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (h *MockHelmResourcesManager) PauseMonitors() {}

func (h *MockHelmResourcesManager) ResumeMonitors() {}

func (h *MockHelmResourcesManager) StartMonitor(_ string, _ []manifest.Manifest, _ string) {
func (h *MockHelmResourcesManager) StartMonitor(_ string, _ []manifest.Manifest, _ string, _ func(string) (revision string, status string, err error)) {
}

func (h *MockHelmResourcesManager) HasMonitor(_ string) bool {
Expand All @@ -49,6 +49,6 @@ func (h *MockHelmResourcesManager) GetAbsentResources(_ []manifest.Manifest, _ s
return nil, nil
}

func (h *MockHelmResourcesManager) Ch() chan AbsentResourcesEvent {
func (h *MockHelmResourcesManager) Ch() chan ReleaseStatusEvent {
return nil
}
7 changes: 4 additions & 3 deletions pkg/helm_resources_manager/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package types

import "github.com/flant/kube-client/manifest"

type AbsentResourcesEvent struct {
ModuleName string
Absent []manifest.Manifest
type ReleaseStatusEvent struct {
ModuleName string
Absent []manifest.Manifest
UnexpectedStatus bool
}
Loading