diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index f2b3a78..0ab68b1 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,22 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch - apiGroups: - apps resources: diff --git a/controllers/seataserver_controller.go b/controllers/seataserver_controller.go index a73f184..899da36 100644 --- a/controllers/seataserver_controller.go +++ b/controllers/seataserver_controller.go @@ -54,6 +54,8 @@ const RequeueSeconds = 10 //+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -183,7 +185,22 @@ func (r *SeataServerReconciler) updateStatefulSet(ctx context.Context, s *seatav s.Status.Synchronized = false } if readySize == newSize && !s.Status.Synchronized { - if err = seata.SyncRaftCluster(ctx, s); err != nil { + username, password := "seata", "seata" + for _, env := range s.Spec.Env { + if env.Name == "console.user.username" { + username, err = seata.FetchEnvVar(ctx, r.Client, s, env) + if err != nil { + logger.Error(err, "Failed to fetch Env console.user.username") + } + } + if env.Name == "console.user.password" { + password, err = seata.FetchEnvVar(ctx, r.Client, s, env) + if err != nil { + logger.Error(err, "Failed to fetch Env console.user.password") + } + } + } + if err = seata.SyncRaftCluster(ctx, s, username, password); err != nil { logger.Error(err, "Failed to synchronize the raft cluster") s.Status.Synchronized = false } else { diff --git a/deploy/seata-server-cluster.yaml b/deploy/seata-server-cluster.yaml index 4499c7a..db81345 100644 --- a/deploy/seata-server-cluster.yaml +++ b/deploy/seata-server-cluster.yaml @@ -5,9 +5,25 @@ metadata: namespace: default spec: serviceName: seata-server-cluster - replicas: 2 - image: seataio/seata-server:2.0.0 + replicas: 1 + image: seataio/seata-server:latest store: resources: requests: storage: 5Gi + env: + - name: console.user.username + value: seata + - name: console.user.password + valueFrom: + secretKeyRef: + name: seata + key: password +--- +apiVersion: v1 +kind: Secret +metadata: + name: seata +type: Opaque +data: + password: MTIzNDU2 diff --git a/pkg/seata/fetchers.go b/pkg/seata/fetchers.go new file mode 100644 index 0000000..26bca69 --- /dev/null +++ b/pkg/seata/fetchers.go @@ -0,0 +1,70 @@ +package seata + +import ( + "context" + "fmt" + seatav1alpha1 "github.com/apache/seata-k8s/api/v1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func FetchEnvVar(ctx context.Context, c client.Client, cr *seatav1alpha1.SeataServer, envVar v1.EnvVar) (string, error) { + if envVar.ValueFrom == nil { + return envVar.Value, nil + } + + // Inspired by kubelet#makeEnvironmentVariables, determine the final values of variables. + // See https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet_pods.go#L694-L806 + var result string + switch { + case envVar.ValueFrom.ConfigMapKeyRef != nil: + cm := envVar.ValueFrom.ConfigMapKeyRef + name := cm.Name + key := cm.Key + optional := cm.Optional != nil && *cm.Optional + + configMap := &v1.ConfigMap{} + err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, configMap) + if err != nil { + if errors.IsNotFound(err) && optional { + // ignore error when marked optional + return result, nil + } + return result, err + } + runtimeVal, ok := configMap.Data[key] + if !ok { + if optional { + return result, nil + } + return result, fmt.Errorf("couldn't find key %v in ConfigMap %v/%v", key, cr.Namespace, name) + } + result = runtimeVal + case envVar.ValueFrom.SecretKeyRef != nil: + s := envVar.ValueFrom.SecretKeyRef + name := s.Name + key := s.Key + optional := s.Optional != nil && *s.Optional + secret := &v1.Secret{} + err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, secret) + if err != nil { + if errors.IsNotFound(err) && optional { + // ignore error when marked optional + return result, nil + } + return result, err + } + runtimeValBytes, ok := secret.Data[key] + if !ok { + if optional { + return result, nil + } + return result, fmt.Errorf("couldn't find key %v in Secret %v/%v", key, cr.Namespace, name) + } + runtimeVal := string(runtimeValBytes) + result = runtimeVal + } + return result, nil +} diff --git a/pkg/seata/synchronizers.go b/pkg/seata/synchronizers.go index adff796..6a12c33 100644 --- a/pkg/seata/synchronizers.go +++ b/pkg/seata/synchronizers.go @@ -33,18 +33,9 @@ type rspData struct { Success bool `json:"success"` } -func changeCluster(s *seatav1alpha1.SeataServer, i int32) error { +func changeCluster(s *seatav1alpha1.SeataServer, i int32, username string, password string) error { client := http.Client{} host := fmt.Sprintf("%s-%d.%s.%s.svc:%d", s.Name, i, s.Spec.ServiceName, s.Namespace, s.Spec.Ports.ConsolePort) - username, password := "seata", "seata" - for _, env := range s.Spec.Env { - if env.Name == "console.user.username" { - username = "seata" - } - if env.Name == "console.user.password" { - password = "seata" - } - } values := map[string]string{"username": username, "password": password} jsonValue, _ := json.Marshal(values) @@ -101,7 +92,7 @@ func changeCluster(s *seatav1alpha1.SeataServer, i int32) error { return nil } -func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer) error { +func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer, username string, password string) error { logger := log.FromContext(ctx) group, childContext := errgroup.WithContext(ctx) @@ -112,7 +103,7 @@ func SyncRaftCluster(ctx context.Context, s *seatav1alpha1.SeataServer) error { case <-childContext.Done(): return nil default: - err := changeCluster(s, finalI) + err := changeCluster(s, finalI, username, password) if err != nil { logger.Error(err, fmt.Sprintf("fail to SyncRaftCluster at %d-th pod", finalI)) }