Skip to content

Commit

Permalink
# This is a combination of 2 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

add incoming flag to peer and unpeer commands

# This is the commit message liqotech#2:

Update pkg/utils/foreignCluster/peeringStatus.go

Co-authored-by: Francesco Torta <62566275+fra98@users.noreply.github.com>
  • Loading branch information
hamzalsheikh and fra98 committed Mar 1, 2024
1 parent 98c64c8 commit 9047acc
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 10 deletions.
3 changes: 2 additions & 1 deletion cmd/liqoctl/cmd/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newPeerCommand(ctx context.Context, f *factory.Factory) *cobra.Command {
}

cmd.PersistentFlags().DurationVar(&options.Timeout, "timeout", 120*time.Second, "Timeout for peering completion")

cmd.PersistentFlags().BoolVar(&options.Incoming, "incoming", false, "Allows incoming peering")
cmd.AddCommand(newPeerOutOfBandCommand(ctx, options))
cmd.AddCommand(newPeerInBandCommand(ctx, options))
return cmd
Expand Down Expand Up @@ -182,6 +182,7 @@ func newPeerInBandCommand(ctx context.Context, peerOptions *peer.Options) *cobra

Run: func(cmd *cobra.Command, args []string) {
options.Timeout = peerOptions.Timeout
options.Incoming = peerOptions.Incoming
output.ExitOnErr(options.Run(ctx))
},
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/liqoctl/cmd/unpeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newUnpeerCommand(ctx context.Context, f *factory.Factory) *cobra.Command {
}

cmd.PersistentFlags().DurationVar(&options.Timeout, "timeout", 120*time.Second, "Timeout for unpeering completion")

cmd.PersistentFlags().BoolVar(&options.Incoming, "incoming", false , "Dis-allowing peering")
cmd.AddCommand(newUnpeerOutOfBandCommand(ctx, options))
cmd.AddCommand(newUnpeerInBandCommand(ctx, options))
return cmd
Expand Down Expand Up @@ -158,6 +158,7 @@ func newUnpeerInBandCommand(ctx context.Context, unpeerOptions *unpeeroob.Option

Run: func(cmd *cobra.Command, args []string) {
options.Timeout = unpeerOptions.Timeout
options.Incoming = unpeerOptions.Incoming
output.ExitOnErr(options.Run(ctx))
},
}
Expand Down
30 changes: 29 additions & 1 deletion pkg/liqoctl/inband/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,22 @@ func (c *Cluster) EnforceOutgoingPeeringFlag(ctx context.Context, remoteID *disc
return nil
}

// EnforceIncomingPeeringFlag sets the incoming peering flag for a given foreign cluster.
func (c *Cluster) EnforceIncomingPeeringFlag(ctx context.Context, remoteID *discoveryv1alpha1.ClusterIdentity, enabled bool) error {
s := c.local.Printer.StartSpinner(fmt.Sprintf("configuring the incoming peering flag for the remote cluster %q", remoteID.ClusterName))
if _, err := controllerutil.CreateOrUpdate(ctx, c.local.CRClient, c.foreignCluster, func() error {
if enabled {
c.foreignCluster.Spec.IncomingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
}
return nil
}); err != nil {
s.Fail(fmt.Sprintf("an error occurred while configuring the incoming peering flag for remote cluster %q: %v", remoteID.ClusterName, err))
return err
}
s.Success(fmt.Sprintf("incoming peering flag for remote cluster %q correctly configured", remoteID.ClusterName))
return nil
}

// DeleteForeignCluster deletes the foreignclusters instance for the given remote cluster.
func (c *Cluster) DeleteForeignCluster(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remID := remoteClusterID.ClusterID
Expand Down Expand Up @@ -861,7 +877,7 @@ func (c *Cluster) DeleteForeignCluster(ctx context.Context, remoteClusterID *dis
}

// DisablePeering disables the peering for the remote cluster by patching the foreigncusters resource.
func (c *Cluster) DisablePeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) (err error) {
func (c *Cluster) DisablePeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity, incoming bool) (err error) {
remID := remoteClusterID.ClusterID
remName := remoteClusterID.ClusterName
s := c.local.Printer.StartSpinner(fmt.Sprintf("disabling peering for the remote cluster %q", remName))
Expand Down Expand Up @@ -894,6 +910,18 @@ func (c *Cluster) DisablePeering(ctx context.Context, remoteClusterID *discovery
remName, fc.Spec.PeeringType, discoveryv1alpha1.PeeringTypeInBand)
}

// Set incoming peering to no and return if flag is set
if incoming {
if _, err = controllerutil.CreateOrUpdate(ctx, c.local.CRClient, fc, func() error {
fc.Spec.IncomingPeeringEnabled = "No"
return nil
}); err != nil {
return fmt.Errorf("an error occurred while disabling incoming peering for remote cluster %q: %w", remName, err)
}
s.Success(fmt.Sprintf("incoming peering correctly disabled for remote cluster %q", remName))
return nil
}

// Set outgoing peering to no.
if _, err = controllerutil.CreateOrUpdate(ctx, c.local.CRClient, fc, func() error {
fc.Spec.OutgoingPeeringEnabled = "No"
Expand Down
23 changes: 22 additions & 1 deletion pkg/liqoctl/peer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Options struct {

ClusterName string
Timeout time.Duration
Incoming bool
}

// Run implements the peer out-of-band command.
Expand All @@ -46,6 +47,15 @@ func (o *Options) Run(ctx context.Context) error {
s.Fail(err.Error())
return err
}

if o.Incoming {
o.Printer.Success.Println("Incoming peering enabled")
if err = o.Wait(ctx, remoteClusterID); err != nil {
return err
}
return nil
}

s.Success("Peering enabled")

if err = o.Wait(ctx, remoteClusterID); err != nil {
Expand All @@ -64,7 +74,11 @@ func (o *Options) peer(ctx context.Context) (*discoveryv1alpha1.ClusterIdentity,
return nil, err
}

fc.Spec.OutgoingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
if o.Incoming {
fc.Spec.IncomingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
} else {
fc.Spec.OutgoingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
}

return &fc.Spec.ClusterIdentity, retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return o.CRClient.Update(ctx, &fc)
Expand All @@ -75,6 +89,13 @@ func (o *Options) peer(ctx context.Context) (*discoveryv1alpha1.ClusterIdentity,
func (o *Options) Wait(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
waiter := wait.NewWaiterFromFactory(o.Factory)

if o.Incoming {
if err := waiter.ForIncomingPeering(ctx, remoteClusterID); err != nil {
return err
}
return nil
}

if err := waiter.ForAuth(ctx, remoteClusterID); err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/liqoctl/peerib/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Options struct {

Bidirectional bool
Timeout time.Duration
Incoming bool
}

// Run implements the peer in-band command.
Expand Down Expand Up @@ -136,6 +137,20 @@ func (o *Options) Run(ctx context.Context) error {
return err
}

// Allowing Incoming peering when the flag is set
if o.Incoming {
// Setting the foreign cluster incoming flag in cluster 1 for cluster 2
if err := cluster1.EnforceIncomingPeeringFlag(ctx, cluster2.GetClusterID(), true); err != nil {
return err
}

// Setting the foreign cluster incoming flag in cluster 2 for cluster 1
if err := cluster2.EnforceIncomingPeeringFlag(ctx, cluster1.GetClusterID(), o.Bidirectional); err != nil {
return err
}
return nil
}

// Setting the foreign cluster outgoing flag in cluster 1 for cluster 2
// This operation is performed after that both foreign clusters have already been successfully created, to prevent a
// possible race condition in which the resource request originated by the local foreign cluster is replicated to and
Expand Down
11 changes: 8 additions & 3 deletions pkg/liqoctl/peeroob/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ func (o *Options) enforceForeignCluster(ctx context.Context) (*discoveryv1alpha1

fc.Spec.ForeignAuthURL = o.ClusterAuthURL
fc.Spec.ForeignProxyURL = ""
fc.Spec.OutgoingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
if fc.Spec.IncomingPeeringEnabled == "" {
fc.Spec.IncomingPeeringEnabled = discoveryv1alpha1.PeeringEnabledAuto

if o.Incoming {
fc.Spec.IncomingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
} else {
fc.Spec.OutgoingPeeringEnabled = discoveryv1alpha1.PeeringEnabledYes
if fc.Spec.IncomingPeeringEnabled == "" {
fc.Spec.IncomingPeeringEnabled = discoveryv1alpha1.PeeringEnabledAuto
}
}
if fc.Spec.InsecureSkipTLSVerify == nil {
fc.Spec.InsecureSkipTLSVerify = pointer.BoolPtr(true)
Expand Down
12 changes: 10 additions & 2 deletions pkg/liqoctl/unpeerib/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Options struct {
RemoteLiqoNamespace string

Timeout time.Duration
Incoming bool
}

// Run implements the unpeer in-band command.
Expand All @@ -51,15 +52,21 @@ func (o *Options) Run(ctx context.Context) error {
}

// Disable peering in cluster 1.
if err := cluster1.DisablePeering(ctx, cluster2.GetClusterID()); err != nil {
if err := cluster1.DisablePeering(ctx, cluster2.GetClusterID(), o.Incoming); err != nil {
return err
}

// Disable peering in cluster 2.
if err := cluster2.DisablePeering(ctx, cluster1.GetClusterID()); err != nil {
if err := cluster2.DisablePeering(ctx, cluster1.GetClusterID(), o.Incoming); err != nil {
return err
}

// DisablePeering only disables incoming peering when the flag is set
// Return if incoming flag is set
if o.Incoming {
return nil
}

// Wait to unpeer in cluster 1.
if err := cluster1.Waiter.ForUnpeering(ctx, cluster2.GetClusterID()); err != nil {
return err
Expand Down Expand Up @@ -113,6 +120,7 @@ func (o *Options) Run(ctx context.Context) error {
if err := cluster2.UnmapAuthIPForCluster(ctx, ipamClient2, cluster1.GetClusterID()); err != nil {
return err
}

// Delete foreigncluster of cluster2 in cluster1.
if err := cluster1.DeleteForeignCluster(ctx, cluster2.GetClusterID()); err != nil {
return err
Expand Down
20 changes: 19 additions & 1 deletion pkg/liqoctl/unpeeroob/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Options struct {
ClusterName string
Timeout time.Duration

Incoming bool
// Whether to enforce the peering to be of type out-of-band, and delete the ForeignCluster resource.
UnpeerOOBMode bool
}
Expand All @@ -52,6 +53,15 @@ func (o *Options) Run(ctx context.Context) error {
s.Fail("Failed unpeering clusters: ", output.PrettyErr(err))
return err
}

if o.Incoming {
s.Success("Incoming peering marked as disabled")
if err = o.wait(ctx, &fc.Spec.ClusterIdentity); err != nil {
return err
}
return nil
}

s.Success("Outgoing peering marked as disabled")

if err = o.wait(ctx, &fc.Spec.ClusterIdentity); err != nil {
Expand Down Expand Up @@ -88,8 +98,13 @@ func (o *Options) unpeer(ctx context.Context) (*discoveryv1alpha1.ForeignCluster
return nil, fmt.Errorf("the peering type towards remote cluster %q is %s, expected %s",
o.ClusterName, foreignCluster.Spec.PeeringType, discoveryv1alpha1.PeeringTypeOutOfBand)
}

if o.Incoming {
foreignCluster.Spec.IncomingPeeringEnabled = discoveryv1alpha1.PeeringEnabledNo
} else {
foreignCluster.Spec.OutgoingPeeringEnabled = discoveryv1alpha1.PeeringEnabledNo
}

foreignCluster.Spec.OutgoingPeeringEnabled = discoveryv1alpha1.PeeringEnabledNo
if err := o.CRClient.Update(ctx, &foreignCluster); err != nil {
return nil, err
}
Expand All @@ -111,5 +126,8 @@ func (o *Options) delete(ctx context.Context, fc *discoveryv1alpha1.ForeignClust

func (o *Options) wait(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
waiter := wait.NewWaiterFromFactory(o.Factory)
if o.Incoming {
return waiter.ForIncomingUnpeering(ctx, remoteClusterID)
}
return waiter.ForOutgoingUnpeering(ctx, remoteClusterID)
}
29 changes: 29 additions & 0 deletions pkg/liqoctl/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ func (w *Waiter) ForOutgoingUnpeering(ctx context.Context, remoteClusterID *disc
return nil
}

// ForIncomingUnpeering waits until the status on the foreiglcusters resource states that the incoming peering has been successfully
// set to None or the timeout expires.
func (w *Waiter) ForIncomingUnpeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Disabling incoming peering to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsIncomingPeeringNo, 1*time.Second)
if client.IgnoreNotFound(err) != nil {
s.Fail(fmt.Sprintf("Failed disabling incoming peering to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Successfully disabled incoming peering to the remote cluster %q", remName))
return nil
}

// ForAuth waits until the authentication has been established with the remote cluster or the timeout expires.
func (w *Waiter) ForAuth(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
Expand Down Expand Up @@ -116,6 +130,21 @@ func (w *Waiter) ForOutgoingPeering(ctx context.Context, remoteClusterID *discov
return nil
}


// ForIncomingPeering waits until the status on the foreiglcusters resource states that the incoming peering has been successfully
// set to Yes or the timeout expires.
func (w *Waiter) ForIncomingPeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Activating incoming peering to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsIncomingPeeringYes, 1*time.Second)
if err != nil {
s.Fail(fmt.Sprintf("Failed activating outgoing peering to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Incoming peering activated to the remote cluster %q", remName))
return nil
}

// ForNode waits until the node has been added to the cluster or the timeout expires.
func (w *Waiter) ForNode(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
Expand Down
10 changes: 10 additions & 0 deletions pkg/utils/foreignCluster/peeringStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ func IsIncomingPeeringNone(foreignCluster *discoveryv1alpha1.ForeignCluster) boo
return curPhase == discoveryv1alpha1.PeeringConditionStatusNone
}

// IsIncomingPeeringYes checks if the incoming peering is set to Yes.
func IsIncomingPeeringYes(foreignCluster *discoveryv1alpha1.ForeignCluster) bool {
return foreignCluster.Spec.IncomingPeeringEnabled == discoveryv1alpha1.PeeringEnabledYes
}

// IsIncomingPeeringNo checks if the incoming peering is set to No.
func IsIncomingPeeringNo(foreignCluster *discoveryv1alpha1.ForeignCluster) bool {
return foreignCluster.Spec.IncomingPeeringEnabled == discoveryv1alpha1.PeeringEnabledNo
}

// IsOutgoingPeeringNone checks if the outgoing peering is set to none.
func IsOutgoingPeeringNone(foreignCluster *discoveryv1alpha1.ForeignCluster) bool {
curPhase := peeringconditionsutils.GetStatus(foreignCluster, discoveryv1alpha1.OutgoingPeeringCondition)
Expand Down

0 comments on commit 9047acc

Please sign in to comment.