Skip to content

Commit

Permalink
feat: use user/pass for JetStream authentication (#1809)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Apr 6, 2022
1 parent 86ac587 commit 9332679
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 492 deletions.
40 changes: 5 additions & 35 deletions api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 7 additions & 38 deletions api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 3 additions & 11 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -380,15 +380,6 @@
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.JetStreamAuth": {
"properties": {
"token": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "Secret for auth token"
}
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.JetStreamBus": {
"description": "JetStreamBus holds the JetStream EventBus information",
"properties": {
Expand Down Expand Up @@ -484,8 +475,9 @@
},
"io.argoproj.eventbus.v1alpha1.JetStreamConfig": {
"properties": {
"auth": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamAuth"
"accessSecret": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "Secret for auth"
},
"streamConfig": {
"type": "string"
Expand Down
14 changes: 3 additions & 11 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus
eventBusAuth = eventBusConfig.NATS.Auth
case eventBusConfig.JetStream != nil:
eventBusType = apicommon.EventBusJetStream
eventBusAuth = &eventbusv1alpha1.AuthStrategyToken
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
default:
return nil, errors.New("invalid event bus")
}
Expand Down Expand Up @@ -108,6 +108,9 @@ func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCall
opts.Url = e.url
if e.auth.Strategy == eventbusv1alpha1.AuthStrategyToken {
opts.Token = e.auth.Crendential.Token
} else if e.auth.Strategy == eventbusv1alpha1.AuthStrategyBasic {
opts.User = e.auth.Crendential.Username
opts.Password = e.auth.Crendential.Password
}
rpc, err := graft.NewNatsRpc(opts)
if err != nil {
Expand Down
11 changes: 7 additions & 4 deletions controllers/eventbus/installer/assets/jetstream/server-auth.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
system_account: sys
authorization {
token: "{{.Token}}"
}

accounts: {
"js": {
"jetstream": true,
"users": [
{"user": "{{.JetStreamUser}}", "pass": "{{.JetStreamPassword}}"}
]
},
"sys": {
"users": [
{"user": "sys", "pass":"{{.SysPassword}}"}
{"user": "sys", "pass": "{{.SysPassword}}"}
]
}
}
3 changes: 1 addition & 2 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func TestInstall(t *testing.T) {
assert.True(t, testObj.Status.IsReady())
assert.NotNil(t, testObj.Status.Config.JetStream)
assert.NotEmpty(t, testObj.Status.Config.JetStream.URL)
assert.NotNil(t, testObj.Status.Config.JetStream.Auth)
assert.NotNil(t, testObj.Status.Config.JetStream.Auth.Token)
assert.NotNil(t, testObj.Status.Config.JetStream.AccessSecret)
})
}
25 changes: 13 additions & 12 deletions controllers/eventbus/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,11 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig,
return &v1alpha1.BusConfig{
JetStream: &v1alpha1.JetStreamConfig{
URL: fmt.Sprintf("nats://%s.%s.svc.cluster.local:%s", generateJetStreamServiceName(r.eventBus), r.eventBus.Namespace, strconv.Itoa(int(jsClientPort))),
Auth: &v1alpha1.JetStreamAuth{
Token: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: generateJetStreamClientAuthSecretName(r.eventBus),
},
Key: common.JetStreamClientAuthSecretKey,
AccessSecret: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: generateJetStreamClientAuthSecretName(r.eventBus),
},
Key: common.JetStreamClientAuthSecretKey,
},
StreamConfig: string(b),
},
Expand Down Expand Up @@ -440,16 +438,19 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre

func (r *jetStreamInstaller) createSecrets(ctx context.Context) error {
encryptionKey := common.RandomString(12)
token := common.RandomString(24)
jsUser := common.RandomString(8)
jsPass := common.RandomString(16)
sysPassword := common.RandomString(24)
authTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/server-auth.conf"))
var authTplOutput bytes.Buffer
if err := authTpl.Execute(&authTplOutput, struct {
Token string
SysPassword string
JetStreamUser string
JetStreamPassword string
SysPassword string
}{
Token: token,
SysPassword: sysPassword,
JetStreamUser: jsUser,
JetStreamPassword: jsPass,
SysPassword: sysPassword,
}); err != nil {
return fmt.Errorf("failed to parse nats auth template, error: %w", err)
}
Expand Down Expand Up @@ -481,7 +482,7 @@ func (r *jetStreamInstaller) createSecrets(ctx context.Context) error {
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
common.JetStreamClientAuthSecretKey: []byte(fmt.Sprintf("token: \"%s\"", token)),
common.JetStreamClientAuthSecretKey: []byte(fmt.Sprintf("username: %s\npassword: %s", jsUser, jsPass)),
},
}

Expand Down
7 changes: 2 additions & 5 deletions controllers/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
}
encodedBusConfig := base64.StdEncoding.EncodeToString(busConfigBytes)
envVars = append(envVars, corev1.EnvVar{Name: common.EnvVarEventBusConfig, Value: encodedBusConfig})
var authStrategy *eventbusv1alpha1.AuthStrategy
var accessSecret *corev1.SecretKeySelector
switch {
case eventBus.Status.Config.NATS != nil:
natsConf := eventBus.Status.Config.NATS
authStrategy = natsConf.Auth
accessSecret = natsConf.AccessSecret
case eventBus.Status.Config.JetStream != nil:
jsConf := eventBus.Status.Config.JetStream
authStrategy = &eventbusv1alpha1.AuthStrategyToken
accessSecret = jsConf.Auth.Token
accessSecret = jsConf.AccessSecret
default:
return nil, errors.New("unsupported event bus")
}
Expand All @@ -223,7 +220,7 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"})

if authStrategy != nil && accessSecret != nil {
if accessSecret != nil {
// Mount the secret as volume instead of using envFrom to gain the ability
// for the sensor deployment to auto reload when the secret changes
volumes = append(volumes, corev1.Volume{
Expand Down
7 changes: 2 additions & 5 deletions controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
encodedBusConfig := base64.StdEncoding.EncodeToString(busConfigBytes)
envVars = append(envVars, corev1.EnvVar{Name: common.EnvVarEventBusConfig, Value: encodedBusConfig})

var authStrategy *eventbusv1alpha1.AuthStrategy
var accessSecret *corev1.SecretKeySelector
switch {
case eventBus.Status.Config.NATS != nil:
natsConf := eventBus.Status.Config.NATS
authStrategy = natsConf.Auth
accessSecret = natsConf.AccessSecret
case eventBus.Status.Config.JetStream != nil:
jsConf := eventBus.Status.Config.JetStream
authStrategy = &eventbusv1alpha1.AuthStrategyToken
accessSecret = jsConf.Auth.Token
accessSecret = jsConf.AccessSecret
default:
return nil, errors.New("unsupported event bus")
}
Expand All @@ -190,7 +187,7 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"})

if authStrategy != nil && accessSecret != nil {
if accessSecret != nil {
// Mount the secret as volume instead of using envFrom to gain the ability
// for the sensor deployment to auto reload when the secret changes
volumes = append(volumes, corev1.Volume{
Expand Down
2 changes: 1 addition & 1 deletion eventbus/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func GetAuth(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig) (*e
case eventBusConfig.NATS != nil:
eventBusAuth = eventBusConfig.NATS.Auth
case eventBusConfig.JetStream != nil:
eventBusAuth = &eventbusv1alpha1.AuthStrategyToken
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
default:
return nil, errors.New("invalid event bus")
}
Expand Down
3 changes: 3 additions & 0 deletions eventbus/jetstream/base/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (stream *Jetstream) MakeConnection() (*JetstreamConnection, error) {
case eventbusv1alpha1.AuthStrategyToken:
log.Info("NATS auth strategy: Token")
opts = append(opts, nats.Token(stream.auth.Crendential.Token))
case eventbusv1alpha1.AuthStrategyBasic:
log.Info("NATS auth strategy: Basic")
opts = append(opts, nats.UserInfo(stream.auth.Crendential.Username, stream.auth.Crendential.Password))
case eventbusv1alpha1.AuthStrategyNone:
log.Info("NATS auth strategy: None")
default:
Expand Down

0 comments on commit 9332679

Please sign in to comment.