Skip to content

Commit

Permalink
Make sync failures fatal after retries
Browse files Browse the repository at this point in the history
Upon starting, failures when syncing OVN DB with K8 should
be considered fatal. Still, this change will introduce
retry logic to minimize pod restarts.

Signed-off-by: Flavio Fernandes <flaviof@redhat.com>
  • Loading branch information
flavio-fernandes committed Jan 31, 2022
1 parent d92eab2 commit af27b80
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 57 deletions.
25 changes: 14 additions & 11 deletions go-controller/pkg/ovn/egressfirewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,32 @@ func newEgressFirewallRule(rawEgressFirewallRule egressfirewallapi.EgressFirewal

// NOTE: Utilize the fact that we know that all egress firewall related setup must have a priority: types.MinimumReservedEgressFirewallPriority <= priority <= types.EgressFirewallStartPriority
func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
oc.syncWithRetry("syncEgressFirewall", func() error { return oc.syncEgressFirewallRetriable(egressFirewalls) })
}

// This function implements the main body of work of what is described by syncEgressFirewall.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncEgressFirewallRetriable(egressFirewalls []interface{}) error {
// Lookup all ACLs used for egress Firewalls
egressFirewallACLs, err := libovsdbops.FindACLsByPriorityRange(oc.nbClient, types.MinimumReservedEgressFirewallPriority, types.EgressFirewallStartPriority)
if err != nil {
klog.Errorf("Unable to list egress firewall ACLs, cannot cleanup old stale data, err: %v", err)
return
return fmt.Errorf("unable to list egress firewall ACLs, cannot cleanup old stale data, err: %v", err)
}

if config.Gateway.Mode == config.GatewayModeShared {
// Mode is shared gateway mode, make sure to delete all egfw ACLs on the node switches
if len(egressFirewallACLs) != 0 {
err = libovsdbops.RemoveACLsFromNodeSwitches(oc.nbClient, egressFirewallACLs)
if err != nil {
klog.Errorf("Failed to remove reject acl from node logical switches: %v", err)
return
return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err)
}
}
} else if config.Gateway.Mode == config.GatewayModeLocal {
// Mode is local gateway mode, make sure to delete all egfw ACLs on the join switches
if len(egressFirewallACLs) != 0 {
err = libovsdbops.RemoveACLsFromJoinSwitch(oc.nbClient, egressFirewallACLs)
if err != nil {
klog.Errorf("Failed to remove reject acl from node logical switches: %v", err)
return
return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err)
}
}
}
Expand All @@ -143,7 +146,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
})
}
if _, err := oc.modelClient.CreateOrUpdate(opModels...); err != nil {
klog.Errorf("Unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err)
return fmt.Errorf("unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err)
}
}
// In any gateway mode, make sure to delete all LRPs on ovn_cluster_router.
Expand All @@ -170,7 +173,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
},
}
if err := oc.modelClient.Delete(opModels...); err != nil {
klog.Errorf("Unable to remove egress firewall policy, cannot cleanup old stale data, err: %v", err)
return fmt.Errorf("unable to remove egress firewall policy, cannot cleanup old stale data, err: %v", err)
}

// sync the ovn and k8s egressFirewall states
Expand All @@ -189,7 +192,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
// get all the k8s EgressFirewall Objects
egressFirewallList, err := oc.kube.GetEgressFirewalls()
if err != nil {
klog.Errorf("Cannot reconcile the state of egressfirewalls in ovn database and k8s. err: %v", err)
return fmt.Errorf("cannot reconcile the state of egressfirewalls in ovn database and k8s. err: %v", err)
}
// delete entries from the map that exist in k8s and ovn
for _, egressFirewall := range egressFirewallList.Items {
Expand All @@ -199,10 +202,10 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
for spuriousEF := range ovnEgressFirewalls {
err := oc.deleteEgressFirewallRules(spuriousEF)
if err != nil {
klog.Errorf("Cannot fully reconcile the state of egressfirewalls ACLs for namespace %s still exist in ovn db: %v", spuriousEF, err)
return
return fmt.Errorf("cannot fully reconcile the state of egressfirewalls ACLs for namespace %s still exist in ovn db: %v", spuriousEF, err)
}
}
return nil
}

func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.EgressFirewall) error {
Expand Down
47 changes: 34 additions & 13 deletions go-controller/pkg/ovn/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -999,13 +1000,25 @@ func (oc *Controller) syncEgressIPs(eIPs []interface{}) {
// - Egress IPs which have been deleted while ovnkube-master was down
// - pods/namespaces which have stopped matching on egress IPs while
// ovnkube-master was down
if egressIPToPodIPCache, err := oc.generatePodIPCacheForEgressIP(eIPs); err == nil {
oc.syncStaleEgressReroutePolicy(egressIPToPodIPCache)
oc.syncStaleSNATRules(egressIPToPodIPCache)
}
oc.syncWithRetry("syncEgressIPs", func() error {
egressIPToPodIPCache, err := oc.generatePodIPCacheForEgressIP(eIPs)
if err != nil {
return fmt.Errorf("syncEgressIPs unable to generate cache for egressip: %v", err)
}
if err = oc.syncStaleEgressReroutePolicy(egressIPToPodIPCache); err != nil {
return fmt.Errorf("syncEgressIPs unable to remove stale reroute policies: %v", err)
}
if err = oc.syncStaleSNATRules(egressIPToPodIPCache); err != nil {
return fmt.Errorf("syncEgressIPs unable to remove stale nats: %v", err)
}
return nil
})
}

func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[string]sets.String) {
// This function implements a portion of syncEgressIPs.
// It removes OVN logical router policies used by EgressIPs deleted while ovnkube-master was down.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[string]sets.String) error {
logicalRouter := nbdb.LogicalRouter{}
logicalRouterPolicyRes := []nbdb.LogicalRouterPolicy{}
opModels := []libovsdbops.OperationModel{
Expand Down Expand Up @@ -1040,11 +1053,15 @@ func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[stri
},
}
if err := oc.modelClient.Delete(opModels...); err != nil {
klog.Errorf("Unable to remove stale logical router policies, err: %v", err)
return fmt.Errorf("unable to remove stale logical router policies, err: %v", err)
}
return nil
}

func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.String) {
// This function implements a portion of syncEgressIPs.
// It removes OVN NAT rules used by EgressIPs deleted while ovnkube-master was down.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.String) error {
predicate := func(item *nbdb.NAT) bool {
egressIPName, exists := item.ExternalIDs["name"]
// Exclude rows that have no name or are not the right type
Expand All @@ -1062,34 +1079,38 @@ func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.St

nats, err := libovsdbops.FindNATsUsingPredicate(oc.nbClient, predicate)
if err != nil {
klog.Errorf("Unable to sync egress IPs err: %v", err)
return
return fmt.Errorf("unable to sync egress IPs err: %v", err)
}

if len(nats) == 0 {
// No stale nat entries to deal with: noop.
return
return nil
}

routers, err := libovsdbops.FindRoutersUsingNAT(oc.nbClient, nats)
if err != nil {
klog.Errorf("Unable to sync egress IPs, err: %v", err)
return
return fmt.Errorf("unable to sync egress IPs, err: %v", err)
}

var errors []error
ops := []ovsdb.Operation{}
for _, router := range routers {
ops, err = libovsdbops.DeleteNATsFromRouterOps(oc.nbClient, ops, &router, nats...)
if err != nil {
klog.Errorf("Error deleting stale NAT from router %s: %v", router.Name, err)
errors = append(errors, err)
continue
}
}
if len(errors) > 0 {
return fmt.Errorf("failed deleting stale NAT: %v", utilerrors.NewAggregate(errors))
}

_, err = libovsdbops.TransactAndCheck(oc.nbClient, ops)
if err != nil {
klog.Errorf("Error deleting stale NATs: %v", err)
return fmt.Errorf("error deleting stale NATs: %v", err)
}
return nil
}

// generatePodIPCacheForEgressIP builds a cache of egressIP name -> podIPs for fast
Expand Down
37 changes: 29 additions & 8 deletions go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,17 +1360,36 @@ func (oc *Controller) syncNodesPeriodic() {
}
}

// syncWithRetry is a wrapper that calls a sync function and retries it in case of failures.
func (oc *Controller) syncWithRetry(syncName string, syncFunc func() error) {
err := utilwait.PollImmediate(500*time.Millisecond, 60*time.Second, func() (bool, error) {
if err := syncFunc(); err != nil {
klog.Errorf("Failed (will retry) in syncing %s: %v", syncName, err)
return false, nil
}
return true, nil
})
if err != nil {
klog.Fatalf("Error in syncing %s: %v", syncName, err)
}
}

// We only deal with cleaning up nodes that shouldn't exist here, since
// watchNodes() will be called for all existing nodes at startup anyway.
// Note that this list will include the 'join' cluster switch, which we
// do not want to delete.
func (oc *Controller) syncNodes(nodes []interface{}) {
oc.syncWithRetry("syncNodes", func() error { return oc.syncNodesRetriable(nodes) })
}

// This function implements the main body of work of what is described by syncNodes.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncNodesRetriable(nodes []interface{}) error {
foundNodes := sets.NewString()
for _, tmp := range nodes {
node, ok := tmp.(*kapi.Node)
if !ok {
klog.Errorf("Spurious object in syncNodes: %v", tmp)
continue
return fmt.Errorf("spurious object in syncNodes: %v", tmp)
}
foundNodes.Insert(node.Name)

Expand All @@ -1387,6 +1406,7 @@ func (oc *Controller) syncNodes(nodes []interface{}) {
// For each existing node, reserve its joinSwitch LRP IPs if they already exist.
_, err := oc.joinSwIPManager.EnsureJoinLRPIPs(node.Name)
if err != nil {
// TODO (flaviof): keep going even if EnsureJoinLRPIPs returned an error. Maybe we should not.
klog.Errorf("Failed to get join switch port IP address for node %s: %v", node.Name, err)
}
}
Expand All @@ -1396,8 +1416,7 @@ func (oc *Controller) syncNodes(nodes []interface{}) {

chassisList, err := libovsdbops.ListChassis(oc.sbClient)
if err != nil {
klog.Errorf("Failed to get chassis list: error: %v", err)
return
return fmt.Errorf("failed to get chassis list: error: %v", err)
}

for _, chassis := range chassisList {
Expand All @@ -1409,8 +1428,10 @@ func (oc *Controller) syncNodes(nodes []interface{}) {

nodeSwitches, err := libovsdbops.FindSwitchesWithOtherConfig(oc.nbClient)
if err != nil {
klog.Errorf("Failed to get node logical switches which have other-config set error: %v", err)
return
if err != libovsdbclient.ErrNotFound {
return fmt.Errorf("failed to get node logical switches which have other-config set error: %v", err)
}
klog.Warning("Did not find any logical switches with other-config")
}

for _, nodeSwitch := range nodeSwitches {
Expand Down Expand Up @@ -1449,7 +1470,7 @@ func (oc *Controller) syncNodes(nodes []interface{}) {
}

if err := libovsdbops.DeleteNodeChassis(oc.sbClient, staleChassis.List()...); err != nil {
klog.Errorf("Failed Deleting chassis %v error: %v", staleChassis.List(), err)
return
return fmt.Errorf("failed deleting chassis %v error: %v", staleChassis.List(), err)
}
return nil
}
12 changes: 9 additions & 3 deletions go-controller/pkg/ovn/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ const (
)

func (oc *Controller) syncNamespaces(namespaces []interface{}) {
oc.syncWithRetry("syncNamespaces", func() error { return oc.syncNamespacesRetriable(namespaces) })
}

// This function implements the main body of work of syncNamespaces.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncNamespacesRetriable(namespaces []interface{}) error {
expectedNs := make(map[string]bool)
for _, nsInterface := range namespaces {
ns, ok := nsInterface.(*kapi.Namespace)
if !ok {
klog.Errorf("Spurious object in syncNamespaces: %v", nsInterface)
continue
return fmt.Errorf("spurious object in syncNamespaces: %v", nsInterface)
}
expectedNs[ns.Name] = true
}
Expand All @@ -50,8 +55,9 @@ func (oc *Controller) syncNamespaces(namespaces []interface{}) {
return nil
})
if err != nil {
klog.Errorf("Error in syncing namespaces: %v", err)
return fmt.Errorf("error in syncing namespaces: %v", err)
}
return nil
}

func (oc *Controller) getRoutingExternalGWs(nsInfo *namespaceInfo) *gatewayInfo {
Expand Down

0 comments on commit af27b80

Please sign in to comment.