Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prevent EventBus with clients connected from being deleted #1066

Merged
merged 5 commits into from
Feb 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions controllers/eventbus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/controllers/eventbus"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

const (
Expand Down Expand Up @@ -62,22 +64,27 @@ func main() {
}

// Readyness probe
err = mgr.AddReadyzCheck("readiness", healthz.Ping)
if err != nil {
if err := mgr.AddReadyzCheck("readiness", healthz.Ping); err != nil {
logger.Fatalw("unable add a readiness check", zap.Error(err))
}

// Liveness probe
err = mgr.AddHealthzCheck("liveness", healthz.Ping)
if err != nil {
if err := mgr.AddHealthzCheck("liveness", healthz.Ping); err != nil {
logger.Fatalw("unable add a health check", zap.Error(err))
}

err = v1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
if err := v1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add scheme", zap.Error(err))
}

if err := eventsourcev1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add EventSource scheme", zap.Error(err))
}

if err := sensorv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add Sensor scheme", zap.Error(err))
}

// A controller with DefaultControllerRateLimiter
c, err := controller.New(eventbus.ControllerName, mgr, controller.Options{
Reconciler: eventbus.NewReconciler(mgr.GetClient(), mgr.GetScheme(), natsStreamingImage, natsMetricsImage, logger),
Expand Down
6 changes: 3 additions & 3 deletions controllers/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
log.Info("deleting eventbus")
if controllerutil.ContainsFinalizer(eventBus, finalizerName) {
// Finalizer logic should be added here.
if err := installer.Uninstall(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log); err != nil {
if err := installer.Uninstall(ctx, eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log); err != nil {
log.Errorw("failed to uninstall", zap.Error(err))
return nil
return err
}
controllerutil.RemoveFinalizer(eventBus, finalizerName)
}
Expand All @@ -87,7 +87,7 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
eventBus.Status.MarkDeployFailed("InvalidSpec", err.Error())
return err
}
return installer.Install(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
return installer.Install(ctx, eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.EventBus) bool {
Expand Down
5 changes: 3 additions & 2 deletions controllers/eventbus/installer/exotic_nats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package installer

import (
"context"
"errors"

"go.uber.org/zap"
Expand All @@ -23,7 +24,7 @@ func NewExoticNATSInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogg
}
}

func (i *exoticNATSInstaller) Install() (*v1alpha1.BusConfig, error) {
func (i *exoticNATSInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
natsObj := i.eventBus.Spec.NATS
if natsObj == nil || natsObj.Exotic == nil {
return nil, errors.New("invalid request")
Expand All @@ -37,7 +38,7 @@ func (i *exoticNATSInstaller) Install() (*v1alpha1.BusConfig, error) {
return busConfig, nil
}

func (i *exoticNATSInstaller) Uninstall() error {
func (i *exoticNATSInstaller) Uninstall(ctx context.Context) error {
i.logger.Info("nothing to uninstall")
return nil
}
7 changes: 4 additions & 3 deletions controllers/eventbus/installer/exotic_nats_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -38,17 +39,17 @@ var (
func TestInstallationExotic(t *testing.T) {
t.Run("installation with exotic nats config", func(t *testing.T) {
installer := NewExoticNATSInstaller(testExoticBus, logging.NewArgoEventsLogger())
conf, err := installer.Install()
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.NATS)
assert.Equal(t, conf.NATS.URL, testExoticURL)
})
}

func TestUNinstallationExotic(t *testing.T) {
func TestUninstallationExotic(t *testing.T) {
t.Run("uninstallation with exotic nats config", func(t *testing.T) {
installer := NewExoticNATSInstaller(testExoticBus, logging.NewArgoEventsLogger())
err := installer.Uninstall()
err := installer.Uninstall(context.TODO())
assert.NoError(t, err)
})
}
82 changes: 73 additions & 9 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
package installer

import (
"errors"
"context"

"github.com/pkg/errors"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

// Installer is an interface for event bus installation
type Installer interface {
Install() (*v1alpha1.BusConfig, error)
Install(ctx context.Context) (*v1alpha1.BusConfig, error)
// Uninsall only needs to handle those resources not cascade deleted.
// For example, undeleted PVCs not automatically deleted when deleting a StatefulSet
Uninstall() error
Uninstall(ctx context.Context) error
}

// Install function installs the event bus
func Install(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
func Install(ctx context.Context, eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger)
if err != nil {
logger.Desugar().Error("failed to an installer", zap.Error(err))
return err
}
busConfig, err := installer.Install()
busConfig, err := installer.Install(ctx)
if err != nil {
logger.Desugar().Error("installation error", zap.Error(err))
return err
Expand Down Expand Up @@ -54,16 +57,77 @@ func getLabels(bus *v1alpha1.EventBus) map[string]string {
}
}

// Uninstall function uninstalls the extra resources who were not cleaned up
// when an eventbus was deleted. Most of the time this is not needed as all
// Uninstall function will be run before the EventBus object is deleted,
// usually it could be used to uninstall the extra resources who would not be cleaned
// up when an EventBus is deleted. Most of the time this is not needed as all
// the dependency resources should have been deleted by owner references cascade
// deletion, but things like PVC created by StatefulSet need to be cleaned up
// separately.
func Uninstall(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
//
// It could also be used to check if the EventBus object can be safely deleted.
func Uninstall(ctx context.Context, eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
linkedEventSources, err := linkedEventSources(ctx, eventBus.Namespace, eventBus.Name, client)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if there's any EventSource or Sensor connected, if true, return an error, which will re-queue the request.

if err != nil {
logger.Errorw("failed to query linked EventSources", zap.Error(err))
return errors.Wrap(err, "failed to check if there is any EventSource linked")
}
if linkedEventSources > 0 {
return errors.Errorf("Can not delete an EventBus with %v EventSources connected", linkedEventSources)
}

linkedSensors, err := linkedSensors(ctx, eventBus.Namespace, eventBus.Name, client)
if err != nil {
logger.Errorw("failed to query linked Sensors", zap.Error(err))
return errors.Wrap(err, "failed to check if there is any Sensor linked")
}
if linkedSensors > 0 {
return errors.Errorf("Can not delete an EventBus with %v Sensors connected", linkedSensors)
}

installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger)
if err != nil {
logger.Desugar().Error("failed to get an installer", zap.Error(err))
return err
}
return installer.Uninstall()
return installer.Uninstall(ctx)
}

func linkedEventSources(ctx context.Context, namespace, eventBusName string, c client.Client) (int, error) {
esl := &eventsourcev1alpha1.EventSourceList{}
if err := c.List(ctx, esl, &client.ListOptions{
Namespace: namespace,
}); err != nil {
return 0, err
}
result := 0
for _, es := range esl.Items {
ebName := es.Spec.EventBusName
if ebName == "" {
ebName = common.DefaultEventBusName
}
if ebName == eventBusName {
result++
}
}
return result, nil
}

func linkedSensors(ctx context.Context, namespace, eventBusName string, c client.Client) (int, error) {
sl := &sensorv1alpha1.SensorList{}
if err := c.List(ctx, sl, &client.ListOptions{
Namespace: namespace,
}); err != nil {
return 0, err
}
result := 0
for _, s := range sl.Items {
sName := s.Spec.EventBusName
if sName == "" {
sName = common.DefaultEventBusName
}
if sName == eventBusName {
result++
}
}
return result, nil
}
90 changes: 90 additions & 0 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/argoproj/argo-events/common/logging"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

func TestGetInstaller(t *testing.T) {
Expand All @@ -23,3 +31,85 @@ func TestGetInstaller(t *testing.T) {
assert.True(t, ok)
})
}

func init() {
_ = eventsourcev1alpha1.AddToScheme(scheme.Scheme)
_ = sensorv1alpha1.AddToScheme(scheme.Scheme)
}

func TestGetLinkedEventSources(t *testing.T) {
t.Run("get linked eventsources", func(t *testing.T) {
es := fakeEmptyEventSource()
es.Spec.EventBusName = "test-sa"
es.Spec.Calendar = fakeCalendarEventSourceMap("test")
cl := fake.NewClientBuilder().Build()
ctx := context.Background()
err := cl.Create(ctx, es, &client.CreateOptions{})
assert.Nil(t, err)
n, err := linkedEventSources(ctx, testNamespace, "test-sa", cl)
assert.Nil(t, err)
assert.Equal(t, n, 1)
})
}

func TestGetLinkedSensors(t *testing.T) {
t.Run("get linked sensors", func(t *testing.T) {
s := fakeSensor()
s.Spec.EventBusName = "test-sa"
cl := fake.NewClientBuilder().Build()
ctx := context.Background()
err := cl.Create(ctx, s, &client.CreateOptions{})
assert.Nil(t, err)
n, err := linkedSensors(ctx, testNamespace, "test-sa", cl)
assert.Nil(t, err)
assert.Equal(t, n, 1)
})
}

func fakeEmptyEventSource() *eventsourcev1alpha1.EventSource {
return &eventsourcev1alpha1.EventSource{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "test-es",
},
Spec: eventsourcev1alpha1.EventSourceSpec{},
}
}

func fakeCalendarEventSourceMap(name string) map[string]eventsourcev1alpha1.CalendarEventSource {
return map[string]eventsourcev1alpha1.CalendarEventSource{name: {Schedule: "*/5 * * * *"}}
}

func fakeSensor() *sensorv1alpha1.Sensor {
return &sensorv1alpha1.Sensor{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-sensor",
Namespace: testNamespace,
},
Spec: sensorv1alpha1.SensorSpec{
Triggers: []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
GroupVersionResource: metav1.GroupVersionResource{
Group: "k8s.io",
Version: "",
Resource: "pods",
},
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
Dependencies: []v1alpha1.EventDependency{
{
Name: "fake-dep",
EventSourceName: "fake-source",
EventName: "fake-one",
},
},
},
}
}
8 changes: 3 additions & 5 deletions controllers/eventbus/installer/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ func NewNATSInstaller(client client.Client, eventBus *v1alpha1.EventBus, streami
}

// Install creats a StatefulSet and a Service for NATS
func (i *natsInstaller) Install() (*v1alpha1.BusConfig, error) {
func (i *natsInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
natsObj := i.eventBus.Spec.NATS
if natsObj == nil || natsObj.Native == nil {
return nil, errors.New("invalid request")
}
ctx := context.Background()

svc, err := i.createStanService(ctx)
if err != nil {
Expand Down Expand Up @@ -118,13 +117,12 @@ func (i *natsInstaller) Install() (*v1alpha1.BusConfig, error) {
}

// Uninstall deletes those objects not handeled by cascade deletion.
func (i *natsInstaller) Uninstall() error {
ctx := context.Background()
func (i *natsInstaller) Uninstall(ctx context.Context) error {
return i.uninstallPVCs(ctx)
}

func (i *natsInstaller) uninstallPVCs(ctx context.Context) error {
// StatefulSet doens't clean up PVC, needs to do it separately
// StatefulSet doesn't clean up PVC, needs to do it separately
// https://github.com/kubernetes/kubernetes/issues/55045
log := i.logger
pvcs, err := i.getPVCs(ctx, i.labels)
Expand Down