Skip to content

Commit

Permalink
feat: amqp event source authentication (#1252)
Browse files Browse the repository at this point in the history
* feat: amqp event source authentication

Signed-off-by: Derek Wang <whynowy@gmail.com>

* example

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Jun 22, 2021
1 parent d403c44 commit e92d105
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 368 deletions.
12 changes: 12 additions & 0 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 20 additions & 6 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,35 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
amqpEventSource := &el.AMQPEventSource
var conn *amqplib.Connection
if err := common.Connect(amqpEventSource.ConnectionBackoff, func() error {
c := amqplib.Config{
Heartbeat: 10 * time.Second,
Locale: "en_US",
}
if amqpEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(amqpEventSource.TLS)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
conn, err = amqplib.DialTLS(amqpEventSource.URL, tlsConfig)
c.TLSClientConfig = tlsConfig
}
if amqpEventSource.Auth != nil {
username, err := common.GetSecretFromVolume(amqpEventSource.Auth.Username)
if err != nil {
return err
return errors.Wrap(err, "username not founnd")
}
} else {
var err error
conn, err = amqplib.Dial(amqpEventSource.URL)
password, err := common.GetSecretFromVolume(amqpEventSource.Auth.Password)
if err != nil {
return err
return errors.Wrap(err, "password not founnd")
}
c.SASL = []amqplib.Authentication{&amqplib.AMQPlainAuth{
Username: username,
Password: password,
}}
}
var err error
conn, err = amqplib.DialConfig(amqpEventSource.URL, c)
if err != nil {
return err
}
return nil
}); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions eventsources/sources/amqp/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,8 @@ func validate(eventSource *v1alpha1.AMQPEventSource) error {
if eventSource.TLS != nil {
return apicommon.ValidateTLSConfig(eventSource.TLS)
}
if eventSource.Auth != nil {
return apicommon.ValidateBasicAuth(eventSource.Auth)
}
return nil
}
9 changes: 9 additions & 0 deletions examples/event-sources/amqp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ spec:
exclusive: false
noLocal: false
noWait: false
# username and password for authentication
# use secret selectors
auth:
username:
name: my-secret
key: username
password:
name: my-secret
key: password


# example-tls:
Expand Down
25 changes: 20 additions & 5 deletions pkg/apis/common/validate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package common

import "errors"
import (
fmt "fmt"
)

// ValidateTLSConfig validates a TLS configuration.
func ValidateTLSConfig(tlsConfig *TLSConfig) error {
Expand All @@ -22,11 +24,24 @@ func ValidateTLSConfig(tlsConfig *TLSConfig) error {
}

if !caCertSet && !clientCertSet && !clientKeySet {
return errors.New("invalid tls config, please configure either caCertSecret, or clientCertSecret and clientKeySecret, or both")
return fmt.Errorf("invalid tls config, please configure either caCertSecret, or clientCertSecret and clientKeySecret, or both")
}

if (clientCertSet || clientKeySet) && (!clientCertSet || !clientKeySet) {
return errors.New("invalid tls config, both clientCertSecret and clientKeySecret need to be configured")
return fmt.Errorf("invalid tls config, both clientCertSecret and clientKeySecret need to be configured")
}
return nil
}

func ValidateBasicAuth(auth *BasicAuth) error {
if auth == nil {
return nil
}
if auth.Username == nil {
return fmt.Errorf("username missing")
}
if auth.Password == nil {
return fmt.Errorf("password missing")
}
return nil
}
Expand All @@ -39,12 +54,12 @@ func ValidateSASLConfig(saslConfig *SASLConfig) error {
switch saslConfig.Mechanism {
case "", "PLAIN", "OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI":
default:
return errors.New("invalid sasl config. Possible values for SASL Mechanism are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`")
return fmt.Errorf("invalid sasl config. Possible values for SASL Mechanism are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`")
}

// user and password must both be set
if saslConfig.UserSecret == nil || saslConfig.PasswordSecret == nil {
return errors.New("invalid sasl config, both userSecret and passwordSecret must be defined")
return fmt.Errorf("invalid sasl config, both userSecret and passwordSecret must be defined")
}

return nil
Expand Down
Loading

0 comments on commit e92d105

Please sign in to comment.