Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: EventSource and Sensor HA without extra RBAC #1163

Merged
merged 4 commits into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package leaderelection

import (
"context"

"github.com/fsnotify/fsnotify"
"github.com/nats-io/graft"
nats "github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventbusdriver "github.com/argoproj/argo-events/eventbus/driver"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

type Elector interface {
RunOrDie(context.Context, LeaderCallbacks)
}

type LeaderCallbacks struct {
OnStartedLeading func(context.Context)
OnStoppedLeading func()
}

func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int) (Elector, error) {
logger := logging.FromContext(ctx)
var eventBusType apicommon.EventBusType
var eventBusAuth *eventbusv1alpha1.AuthStrategy
if eventBusConfig.NATS != nil {
eventBusType = apicommon.EventBusNATS
eventBusAuth = eventBusConfig.NATS.Auth
} else {
return nil, errors.New("invalid event bus")
}
var auth *eventbusdriver.Auth
cred := &eventbusdriver.AuthCredential{}
if eventBusAuth == nil || *eventBusAuth == eventbusv1alpha1.AuthStrategyNone {
auth = &eventbusdriver.Auth{
Strategy: eventbusv1alpha1.AuthStrategyNone,
}
} else {
v := viper.New()
v.SetConfigName("auth")
v.SetConfigType("yaml")
v.AddConfigPath(common.EventBusAuthFileMountPath)
err := v.ReadInConfig()
if err != nil {
return nil, errors.Errorf("failed to load auth.yaml. err: %+v", err)
}
err = v.Unmarshal(cred)
if err != nil {
logger.Errorw("failed to unmarshal auth.yaml", zap.Error(err))
return nil, err
}
v.WatchConfig()
v.OnConfigChange(func(e fsnotify.Event) {
logger.Info("eventbus auth config file changed.")
err = v.Unmarshal(cred)
if err != nil {
logger.Errorw("failed to unmarshal auth.yaml after reloading", zap.Error(err))
}
})
auth = &eventbusdriver.Auth{
Strategy: *eventBusAuth,
Crendential: cred,
}
}
var elector Elector
switch eventBusType {
case apicommon.EventBusNATS:
elector = &natsEventBusElector{
clusterName: clusterName,
size: clusterSize,
url: eventBusConfig.NATS.URL,
auth: auth,
}
default:
return nil, errors.New("invalid eventbus type")
}
return elector, nil
}

type natsEventBusElector struct {
clusterName string
size int
url string
auth *eventbusdriver.Auth
}

func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCallbacks) {
log := logging.FromContext(ctx)
ci := graft.ClusterInfo{Name: e.clusterName, Size: e.size}
opts := &nats.DefaultOptions
opts.Url = e.url
if e.auth.Strategy == eventbusv1alpha1.AuthStrategyToken {
opts.Token = e.auth.Crendential.Token
}
rpc, err := graft.NewNatsRpc(opts)
if err != nil {
log.Fatalw("failed to new Nats Rpc", zap.Error(err))
}
errChan := make(chan error)
stateChangeChan := make(chan graft.StateChange)
handler := graft.NewChanHandler(stateChangeChan, errChan)
node, err := graft.New(ci, handler, rpc, "/tmp/graft.log")
if err != nil {
log.Fatalw("failed to new a node", zap.Error(err))
}
defer node.Close()

cctx, cancel := context.WithCancel(ctx)
defer cancel()

if node.State() == graft.LEADER {
log.Info("I'm the LEADER, starting ...")
go callbacks.OnStartedLeading(cctx)
} else {
log.Info("Not the LEADER, stand by ...")
}

handleStateChange := func(sc graft.StateChange) {
switch sc.To {
case graft.LEADER:
log.Info("I'm the LEADER, starting ...")
go callbacks.OnStartedLeading(cctx)
case graft.FOLLOWER, graft.CANDIDATE:
log.Infof("Becoming a %v, stand by ...", sc.To)
if sc.From == graft.LEADER {
cancel()
callbacks.OnStoppedLeading()
cctx, cancel = context.WithCancel(ctx)
}
case graft.CLOSED:
if sc.From == graft.LEADER {
cancel()
callbacks.OnStoppedLeading()
}
log.Fatal("Leader elector connection was CLOSED")
default:
log.Fatalf("Unknown state: %s", sc.To)
}
}

for {
select {
case <-ctx.Done():
log.Info("exiting...")
return
case sc := <-stateChangeChan:
handleStateChange(sc)
case err := <-errChan:
log.Errorw("Error happened", zap.Error(err))
}
}
}
14 changes: 0 additions & 14 deletions controllers/eventsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -71,9 +70,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
log.Info("deleting eventsource")
if controllerutil.ContainsFinalizer(eventSource, finalizerName) {
// Finalizer logic should be added here.
if err := r.finalize(ctx, eventSource); err != nil {
return err
}
controllerutil.RemoveFinalizer(eventSource, finalizerName)
}
return nil
Expand All @@ -97,16 +93,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
return Reconcile(r.client, args, log)
}

func (r *reconciler) finalize(ctx context.Context, eventSource *v1alpha1.EventSource) error {
// Clean up Lease objects if there's any
if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{},
client.InNamespace(eventSource.Namespace),
client.MatchingFields{"metadata.name": "eventsource-" + eventSource.Name}); err != nil {
return err
}
return nil
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.EventSource) bool {
if old == nil {
return true
Expand Down
27 changes: 5 additions & 22 deletions controllers/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (

"github.com/argoproj/argo-events/common"
controllerscommon "github.com/argoproj/argo-events/controllers/common"
"github.com/argoproj/argo-events/eventsources"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
)
Expand Down Expand Up @@ -221,9 +219,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
},
},
})
emptyDirVolName := "tmp"
volumes = append(volumes, corev1.Volume{
Name: emptyDirVolName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
})
deploymentSpec.Template.Spec.Volumes = volumes
volumeMounts := deploymentSpec.Template.Spec.Containers[0].VolumeMounts
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: "auth-volume", MountPath: common.EventBusAuthFileMountPath})
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"})
deploymentSpec.Template.Spec.Containers[0].VolumeMounts = volumeMounts
}
} else {
Expand Down Expand Up @@ -332,26 +335,6 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
spec.Template.Spec.PriorityClassName = args.EventSource.Spec.Template.PriorityClassName
spec.Template.Spec.Priority = args.EventSource.Spec.Template.Priority
}
allEventTypes := eventsources.GetEventingServers(args.EventSource, nil)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this any more, with leader election, all the event source deployments can run with rolling update strategy.

recreateTypes := make(map[apicommon.EventSourceType]bool)
for _, esType := range apicommon.RecreateStrategyEventSources {
recreateTypes[esType] = true
}
recreates := 0
for eventType := range allEventTypes {
if _, ok := recreateTypes[eventType]; ok {
recreates++
break
}
}
if recreates > 0 && replicas == 1 {
// For those event types, if there's only 1 replica, use recreate strategy.
// If replicas > 1, which means HA is available for them, rolling update strategy
// is better.
spec.Strategy = appv1.DeploymentStrategy{
Type: appv1.RecreateDeploymentStrategyType,
}
}
return spec, nil
}

Expand Down
14 changes: 0 additions & 14 deletions controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -87,9 +86,6 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
log.Info("deleting sensor")
if controllerutil.ContainsFinalizer(sensor, finalizerName) {
// Finalizer logic should be added here.
if err := r.finalize(ctx, sensor); err != nil {
return err
}
controllerutil.RemoveFinalizer(sensor, finalizerName)
}
return nil
Expand All @@ -113,16 +109,6 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
return Reconcile(r.client, args, log)
}

func (r *reconciler) finalize(ctx context.Context, sensor *v1alpha1.Sensor) error {
// Clean up Lease objects if there's any
if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{},
client.InNamespace(sensor.Namespace),
client.MatchingFields{"metadata.name": "sensor-" + sensor.Name}); err != nil {
return err
}
return nil
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.Sensor) bool {
if old == nil {
return true
Expand Down
9 changes: 5 additions & 4 deletions controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
},
},
})
emptyDirVolName := "tmp"
volumes = append(volumes, corev1.Volume{
Name: emptyDirVolName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
})
deploymentSpec.Template.Spec.Volumes = volumes
volumeMounts := deploymentSpec.Template.Spec.Containers[0].VolumeMounts
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: "auth-volume", MountPath: common.EventBusAuthFileMountPath})
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"})
deploymentSpec.Template.Spec.Containers[0].VolumeMounts = volumeMounts
}
} else {
Expand Down Expand Up @@ -270,10 +275,6 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
MatchLabels: args.Labels,
},
Replicas: &replicas,
Strategy: appv1.DeploymentStrategy{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With leader election, sensor deployments can also run with rolling update strategy.

// Event bus does not allow multiple clients with same clientID to connect to the server at the same time.
Type: appv1.RecreateDeploymentStrategyType,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podTemplateLabels,
Expand Down
25 changes: 0 additions & 25 deletions docs/eventsources/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,6 @@ old one is gone.
- Calendar
- Generic

### RBAC

To achieve `Active-Passive` strategy for these EventSources, a Service Account
with extra RBAC settings is needed. The Service Account needs to be bound to a
Role like following, and specified in the spec through
`spec.template.serviceAccountName`.

```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: lease-role
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
resourceNames:
- eventsource-{event-source-name}
verbs:
- "*"
```

**NOTE: This is not requried if `spec.replicas = 1`.**

## More

Check [this](../dr_ha_recommendations.md) out to learn more information about
Expand Down
24 changes: 0 additions & 24 deletions docs/sensors/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,6 @@ elected to be active if the old one is gone.
**Please DO NOT manually scale up the replicas, that might cause unexpected
behaviors!**

## RBAC

To achieve HA for Sensor Pods, a Service Account with extra RBAC settings is
needed. The Service Account needs to be bound to a Role like following, and
specified in the spec through `spec.template.serviceAccountName`.

```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: lease-role
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
resourceNames:
- sensor-{sensor-name)
verbs:
- "*"
```

**NOTE: This is not requried if `spec.replicas = 1`.**

## More

Check [this](../dr_ha_recommendations.md) out to learn more information about
Expand Down
10 changes: 4 additions & 6 deletions docs/service-accounts.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

A `Service Account` can be specified in the EventSource object with
`spec.template.serviceAccountName`, however it is not needed for all the
EventSource types except `resource`, unless you want to achieve
[HA](eventsources/ha.md) for some of them. For a `resource` EventSource, you
need to specify a Service Accout and give it `list` and `watch` permissions for
the resource being watched.
EventSource types except `resource`. For a `resource` EventSource, you need to
specify a Service Accout and give it `list` and `watch` permissions for the
resource being watched.

For example, if you want to watch actions on `Deployment` objects, you need to:

Expand All @@ -31,8 +30,7 @@ For example, if you want to watch actions on `Deployment` objects, you need to:

A `Service Account` also can be specified in a Sensor object via
`spec.template.serviceAccountName`, this is only needed when `k8s` trigger or
`argoWorkflow` trigger is defined in the Sensor object, or you want to run the
Sensor with [HA](sensors/ha.md).
`argoWorkflow` trigger is defined in the Sensor object.

The sensor examples provided by us use `argo-events-sa` service account to
execute the triggers, but it has more permissions than needed, and you may want
Expand Down
Loading