Skip to content

Commit

Permalink
feature: add fetchers.go to fetch EnvVar actual value
Browse files Browse the repository at this point in the history
  • Loading branch information
ptyin committed Feb 27, 2024
1 parent 87ba23b commit ad28f6e
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 15 deletions.
16 changes: 16 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
Expand Down
19 changes: 18 additions & 1 deletion controllers/seataserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const RequeueSeconds = 10
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -183,7 +185,22 @@ func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav
s.Status.Synchronized = false
}
if readySize == newSize && !s.Status.Synchronized {
if err = seata.SyncRaftCluster(ctx, s); err != nil {
username, password := "seata", "seata"
for _, env := range s.Spec.Env {
if env.Name == "console.user.username" {
username, err = seata.FetchEnvVar(ctx, r.Client, s, env)
if err != nil {
logger.Error(err, "Failed to fetch Env console.user.username")
}
}
if env.Name == "console.user.password" {
password, err = seata.FetchEnvVar(ctx, r.Client, s, env)
if err != nil {
logger.Error(err, "Failed to fetch Env console.user.password")
}
}
}
if err = seata.SyncRaftCluster(ctx, s, username, password); err != nil {
logger.Error(err, "Failed to synchronize the raft cluster")
s.Status.Synchronized = false
} else {
Expand Down
20 changes: 18 additions & 2 deletions deploy/seata-server-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,25 @@ metadata:
namespace: default
spec:
serviceName: seata-server-cluster
replicas: 2
image: seataio/seata-server:2.0.0
replicas: 1
image: seataio/seata-server:latest
store:
resources:
requests:
storage: 5Gi
env:
- name: console.user.username
value: seata
- name: console.user.password
valueFrom:
secretKeyRef:
name: seata
key: password
---
apiVersion: v1
kind: Secret
metadata:
name: seata
type: Opaque
data:
password: MTIzNDU2
70 changes: 70 additions & 0 deletions pkg/seata/fetchers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package seata

import (
"context"
"fmt"
seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func FetchEnvVar(ctx context.Context, c client.Client, cr *seatav1alpha1.SeataServer, envVar v1.EnvVar) (string, error) {
if envVar.ValueFrom == nil {
return envVar.Value, nil
}

// Inspired by kubelet#makeEnvironmentVariables, determine the final values of variables.
// See https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet_pods.go#L694-L806
var result string
switch {
case envVar.ValueFrom.ConfigMapKeyRef != nil:
cm := envVar.ValueFrom.ConfigMapKeyRef
name := cm.Name
key := cm.Key
optional := cm.Optional != nil && *cm.Optional

configMap := &v1.ConfigMap{}
err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, configMap)
if err != nil {
if errors.IsNotFound(err) && optional {
// ignore error when marked optional
return result, nil
}
return result, err
}
runtimeVal, ok := configMap.Data[key]
if !ok {
if optional {
return result, nil
}
return result, fmt.Errorf("couldn't find key %v in ConfigMap %v/%v", key, cr.Namespace, name)
}
result = runtimeVal
case envVar.ValueFrom.SecretKeyRef != nil:
s := envVar.ValueFrom.SecretKeyRef
name := s.Name
key := s.Key
optional := s.Optional != nil && *s.Optional
secret := &v1.Secret{}
err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, secret)
if err != nil {
if errors.IsNotFound(err) && optional {
// ignore error when marked optional
return result, nil
}
return result, err
}
runtimeValBytes, ok := secret.Data[key]
if !ok {
if optional {
return result, nil
}
return result, fmt.Errorf("couldn't find key %v in Secret %v/%v", key, cr.Namespace, name)
}
runtimeVal := string(runtimeValBytes)
result = runtimeVal
}
return result, nil
}
15 changes: 3 additions & 12 deletions pkg/seata/synchronizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@ type rspData struct {
Success bool `json:"success"`
}

func changeCluster(s *seatav1alpha1.SeataServer, i int32) error {
func changeCluster(s *seatav1alpha1.SeataServer, i int32, username string, password string) error {
client := http.Client{}
host := fmt.Sprintf("%s-%d.%s.%s.svc:%d", s.Name, i, s.Spec.ServiceName, s.Namespace, s.Spec.Ports.ConsolePort)
username, password := "seata", "seata"
for _, env := range s.Spec.Env {
if env.Name == "console.user.username" {
username = "seata"
}
if env.Name == "console.user.password" {
password = "seata"
}
}

values := map[string]string{"username": username, "password": password}
jsonValue, _ := json.Marshal(values)
Expand Down Expand Up @@ -101,7 +92,7 @@ func changeCluster(s *seatav1alpha1.SeataServer, i int32) error {
return nil
}

func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer) error {
func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer, username string, password string) error {
logger := log.FromContext(ctx)
group, childContext := errgroup.WithContext(ctx)

Expand All @@ -112,7 +103,7 @@ func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer) error {
case <-childContext.Done():
return nil
default:
err := changeCluster(s, finalI)
err := changeCluster(s, finalI, username, password)
if err != nil {
logger.Error(err, fmt.Sprintf("fail to SyncRaftCluster at %d-th pod", finalI))
}
Expand Down

0 comments on commit ad28f6e

Please sign in to comment.