Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func Start(ctx context.Context, hubCfg *rest.Config, hubOpts ctrl.Options) error

membershipChan := make(chan fleetv1alpha1.ClusterState)
internalMemberClusterChan := make(chan fleetv1alpha1.ClusterState)
defer close(membershipChan)
defer close(internalMemberClusterChan)

if err = memberinternalmembercluster.NewMemberReconciler(
if err = memberinternalmembercluster.NewReconciler(
hubMrg.GetClient(), memberMgr.GetClient(), restMapper, internalMemberClusterChan,
membershipChan).SetupWithManager(hubMrg); err != nil {
return errors.Wrap(err, "unable to create controller hub_member")
Expand All @@ -111,12 +113,8 @@ func Start(ctx context.Context, hubCfg *rest.Config, hubOpts ctrl.Options) error
os.Exit(1)
}

if err = (&membership.Reconciler{
Client: memberMgr.GetClient(),
Scheme: memberMgr.GetScheme(),
MembershipChan: membershipChan,
InternalMemberClusterChan: internalMemberClusterChan,
}).SetupWithManager(memberMgr); err != nil {
if err = membership.NewReconciler(memberMgr.GetClient(), internalMemberClusterChan, membershipChan).
SetupWithManager(memberMgr); err != nil {
return errors.Wrap(err, "unable to create controller membership")
}

Expand Down Expand Up @@ -146,5 +144,6 @@ func Start(ctx context.Context, hubCfg *rest.Config, hubOpts ctrl.Options) error
if err := memberMgr.Start(ctx); err != nil {
return errors.Wrap(err, "problem starting member manager")
}

return nil
}
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9
github.com/google/cadvisor v0.43.0/go.mod h1:+RdMSbc3FVr5NYCD2dOEJy/LI0jYJ/0xJXkzWXEyiFQ=
github.com/google/cel-go v0.9.0/go.mod h1:U7ayypeSkw23szu4GaQTPJGx66c20mx8JklMSxrmI1w=
github.com/google/cel-spec v0.6.0/go.mod h1:Nwjgxy5CbjlPrtCWjeDjUyKMl8w41YBYGjsyDdqk0xA=
github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down
63 changes: 29 additions & 34 deletions pkg/controllers/memberinternalmembercluster/member_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"sync"

Comment thread
helayoty marked this conversation as resolved.
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand All @@ -18,45 +17,27 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils"
)

// TODO (mng): move `util` pkg to `controller/util`

// MemberReconciler reconciles a InternalMemberCluster object in the member cluster.
type MemberReconciler struct {
// Reconciler reconciles a InternalMemberCluster object in the member cluster.
type Reconciler struct {
hubClient client.Client
memberClient client.Client
restMapper meta.RESTMapper
recorder record.EventRecorder
internalMemberClusterChan chan<- fleetv1alpha1.ClusterState
membershipChan <-chan fleetv1alpha1.ClusterState
membershipState fleetv1alpha1.ClusterState
membershipStateLock sync.RWMutex
}

var membershipStateThreadSafe struct {
mu sync.Mutex
state fleetv1alpha1.ClusterState
}

func watchMembershipChan(membershipChan <-chan fleetv1alpha1.ClusterState) {
for range membershipChan {
membershipStateSignal, more := <-membershipChan
if !more {
return
}
klog.InfoS("membership state",
"membership", membershipStateSignal)

membershipStateThreadSafe.mu.Lock()
membershipStateThreadSafe.state = membershipStateSignal
membershipStateThreadSafe.mu.Unlock()
}
}

func NewMemberReconciler(hubClient client.Client, memberClient client.Client, restMapper meta.RESTMapper,
// NewReconciler creates a new reconciler for the internal membership CR
func NewReconciler(hubClient client.Client, memberClient client.Client, restMapper meta.RESTMapper,
internalMemberClusterChan chan<- fleetv1alpha1.ClusterState,
membershipChan <-chan fleetv1alpha1.ClusterState) *MemberReconciler {
return &MemberReconciler{
membershipChan <-chan fleetv1alpha1.ClusterState) *Reconciler {
return &Reconciler{
hubClient: hubClient,
memberClient: memberClient,
restMapper: restMapper,
Expand All @@ -69,20 +50,34 @@ func NewMemberReconciler(hubClient client.Client, memberClient client.Client, re
//+kubebuilder:rbac:groups=fleet.azure.com,resources=internalmemberclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.azure.com,resources=internalmemberclusters/finalizers,verbs=update

func (r *MemberReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO (mng): This is placeholder for implementation of GetConfigWithSecret
var secret v1.Secret
_, _ = utils.GetConfigWithSecret(secret)
// TODO (mng): This is placeholder for implementation of internalMemberCluster
r.getMembershipClusterState()

return ctrl.Result{}, nil
}

func (r *Reconciler) getMembershipClusterState() fleetv1alpha1.ClusterState {
r.membershipStateLock.RLock()
defer r.membershipStateLock.RUnlock()
return r.membershipState
}

func (r *Reconciler) watchMembershipChan() {
for membershipState := range r.membershipChan {
klog.InfoS("membership state has changed", "membershipState", membershipState)
r.membershipStateLock.Lock()
r.membershipState = membershipState
r.membershipStateLock.Unlock()
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *MemberReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("InternalMemberCluster_member")
go watchMembershipChan(r.membershipChan)
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("MemberShipController")
go r.watchMembershipChan()
return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1alpha1.InternalMemberCluster{}).
Complete(r)
Expand Down
152 changes: 72 additions & 80 deletions pkg/controllers/membership/membership_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"sync"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -21,43 +22,86 @@ import (
"go.goms.io/fleet/apis"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/controllers/common"
)

"github.com/pkg/errors"
errors2 "k8s.io/apimachinery/pkg/api/errors"
// Reconcile event reasons.
const (
reasonMembershipJoined = "MembershipJoined"
reasonMembershipJoinUnknown = "MembershipJoinUnknown"
)

// Reconciler reconciles a Membership object
type Reconciler struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
InternalMemberClusterChan <-chan fleetv1alpha1.ClusterState
MembershipChan chan<- fleetv1alpha1.ClusterState
recorder record.EventRecorder
internalMemberClusterChan <-chan fleetv1alpha1.ClusterState
membershipChan chan<- fleetv1alpha1.ClusterState
internalMemberClusterState fleetv1alpha1.ClusterState
clusterStateLock sync.RWMutex
}

// Reconcile event reasons.
const (
reasonMembershipJoined = "MembershipJoined"
reasonMembershipJoinUnknown = "MembershipJoinUnknown"
)
// NewReconciler creates a new Reconciler for membership
func NewReconciler(hubClient client.Client, internalMemberClusterChan <-chan fleetv1alpha1.ClusterState,
Comment thread
helayoty marked this conversation as resolved.
membershipChan chan<- fleetv1alpha1.ClusterState) *Reconciler {
return &Reconciler{
Client: hubClient,
internalMemberClusterChan: internalMemberClusterChan,
membershipChan: membershipChan,
}
}

//+kubebuilder:rbac:groups=fleet.azure.com,resources=memberships,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.azure.com,resources=memberships/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.azure.com,resources=memberships/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch

// Reconcile reconciles membership Custom Resource on member cluster.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var clusterMembership fleetv1alpha1.Membership

if err := r.Client.Get(ctx, req.NamespacedName, &clusterMembership); err != nil {
if !apierr.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "error getting membership CR")
}
}

if clusterMembership.Spec.State == fleetv1alpha1.ClusterStateJoin {
r.membershipChan <- fleetv1alpha1.ClusterStateJoin
internalMemberClusterState := r.getInternalMemberClusterState()
if internalMemberClusterState == fleetv1alpha1.ClusterStateJoin {
r.markMembershipJoinSucceed(&clusterMembership)
err := r.Client.Update(ctx, &clusterMembership)
return ctrl.Result{}, errors.Wrap(err, "error marking membership as joined")
}
// the state can be leave or unknown
r.markMembershipJoinUnknown(&clusterMembership, nil)
err := r.Client.Update(ctx, &clusterMembership)
return ctrl.Result{RequeueAfter: time.Minute}, errors.Wrap(err, "error marking membership as unknown")
}
// This is when the state is leave
r.membershipChan <- fleetv1alpha1.ClusterStateLeave
return ctrl.Result{}, nil
}

var internalMemberClusterStateThreadSafe struct {
mu sync.Mutex
state fleetv1alpha1.ClusterState
func (r *Reconciler) getInternalMemberClusterState() fleetv1alpha1.ClusterState {
r.clusterStateLock.RLock()
Comment thread
minhng22 marked this conversation as resolved.
defer r.clusterStateLock.RUnlock()
return r.internalMemberClusterState
}

// checkStateJoining checks if the state of the membership controller is Joining.
// This indicates that the join flow has finished on the InternalMemberCluster side.
func checkStateJoining() bool {
internalMemberClusterStateThreadSafe.mu.Lock()
defer internalMemberClusterStateThreadSafe.mu.Unlock()
return internalMemberClusterStateThreadSafe.state == fleetv1alpha1.ClusterStateJoin
func (r *Reconciler) watchInternalMemberClusterChan() {
for internalMemberClusterState := range r.internalMemberClusterChan {
klog.InfoS("internal memberCluster state has changed", "internalMemberCluster", internalMemberClusterState)
r.clusterStateLock.Lock()
r.internalMemberClusterState = internalMemberClusterState
r.clusterStateLock.Unlock()
}
}

func markMembershipJoinSucceed(recorder record.EventRecorder, membership apis.ConditionedObj) {
func (r *Reconciler) markMembershipJoinSucceed(membership apis.ConditionedObj) {
klog.InfoS("mark membership joined",
"namespace", membership.GetNamespace(), "membership", membership.GetName())
recorder.Event(membership, corev1.EventTypeNormal, reasonMembershipJoined, "mark membership joined")
r.recorder.Event(membership, corev1.EventTypeNormal, reasonMembershipJoined, "membership joined")
joinedCondition := metav1.Condition{
Type: fleetv1alpha1.ConditionTypeMembershipJoin,
Status: metav1.ConditionTrue,
Expand All @@ -67,10 +111,11 @@ func markMembershipJoinSucceed(recorder record.EventRecorder, membership apis.Co
membership.SetConditions(joinedCondition, common.ReconcileSuccessCondition())
}

func markMembershipJoinUnknown(recorder record.EventRecorder, membership apis.ConditionedObj, err error) {
klog.InfoS("mark membership join unknown",
// TODO: separate out the error case from the real "joining" condition
func (r *Reconciler) markMembershipJoinUnknown(membership apis.ConditionedObj, err error) {
klog.V(5).InfoS("mark membership join unknown",
Comment thread
helayoty marked this conversation as resolved.
"namespace", membership.GetNamespace(), "membership", membership.GetName())
recorder.Event(membership, corev1.EventTypeNormal, reasonMembershipJoinUnknown, "mark membership join unknown")
r.recorder.Event(membership, corev1.EventTypeNormal, reasonMembershipJoinUnknown, "membership join unknown")

var errMsg string
if err != nil {
Expand All @@ -86,63 +131,10 @@ func markMembershipJoinUnknown(recorder record.EventRecorder, membership apis.Co
membership.SetConditions(joinUnknownCondition, common.ReconcileErrorCondition(err))
}

func watchInternalMemberClusterChan(imcState <-chan fleetv1alpha1.ClusterState) {
for range imcState {
internalMemberClusterStateSignal, more := <-imcState
if !more {
return
}
klog.InfoS("internal member cluster state",
"internalMemberCluster", internalMemberClusterStateSignal)

internalMemberClusterStateThreadSafe.mu.Lock()
internalMemberClusterStateThreadSafe.state = internalMemberClusterStateSignal
internalMemberClusterStateThreadSafe.mu.Unlock()
}
}

//+kubebuilder:rbac:groups=fleet.azure.com,resources=memberships,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=fleet.azure.com,resources=memberships/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=fleet.azure.com,resources=memberships/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch

// Reconcile reconciles membership Custom Resource on member cluster.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var clusterMembership fleetv1alpha1.Membership

if err := r.Client.Get(ctx, req.NamespacedName, &clusterMembership); err != nil {
if !errors2.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "error getting membership")
}
}

if clusterMembership.Spec.MemberClusterName == clusterMembership.Name {
if clusterMembership.Spec.State == fleetv1alpha1.ClusterStateJoin {
markMembershipJoinUnknown(r.recorder, &clusterMembership, nil)
r.MembershipChan <- fleetv1alpha1.ClusterStateJoin

if err := r.Client.Update(ctx, &clusterMembership); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error marking membership as joined")
}

if checkStateJoining() {
markMembershipJoinSucceed(r.recorder, &clusterMembership)
if err := r.Client.Update(ctx, &clusterMembership); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error marking membership as joined")
}
}

return ctrl.Result{RequeueAfter: time.Minute}, nil
}
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("membership")
go watchInternalMemberClusterChan(r.InternalMemberClusterChan)
go r.watchInternalMemberClusterChan()
return ctrl.NewControllerManagedBy(mgr).
For(&fleetv1alpha1.Membership{}).
Complete(r)
Expand Down