Skip to content

Commit

Permalink
Use extinction index to track deleted nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
wjordan committed Mar 1, 2022
1 parent 3cc8970 commit 440db29
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
23 changes: 18 additions & 5 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
// of any service was unregistered. This is used by blocking queries on missing services.
const indexServiceExtinction = "service_last_extinction"

const indexNodeExtinction = "node_last_extinction"

const (
// minUUIDLookupLen is used as a minimum length of a node name required before
// we test to see if the name is actually a UUID and perform an ID-based node
Expand Down Expand Up @@ -676,8 +678,16 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta
if err := catalogUpdateNodesIndexes(tx, idx, entMeta); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
if err := catalogUpdateNodeIndexes(tx, nodeName, idx, entMeta); err != nil {
return fmt.Errorf("failed updating node index: %s", err)
// Cleanup the node.<nodeName> index
_, nodeIndex, err := catalogNodeWatchIndex(tx, nodeName, entMeta)
if err == nil && nodeIndex != nil {
// we found node.<nodeName> index, garbage collect it
if errW := tx.Delete(tableIndex, nodeIndex); errW != nil {
return fmt.Errorf("[FAILED] deleting nodeIndex %s: %s", nodeName, err)
}
}
if err := catalogUpdateNodeExtinctionIndex(tx, idx, entMeta); err != nil {
return err
}

// Invalidate any sessions for this node.
Expand Down Expand Up @@ -1508,7 +1518,8 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
} else {
if len(nodeNameOrID) < minUUIDLookupLen {
ws.Add(watchCh)
return true, 0, nil, nil, nil
idx := catalogNodeMaxExtinctionIndex(tx, entMeta)
return true, idx, nil, nil, nil
}

// Attempt to lookup the node by its node ID
Expand All @@ -1520,14 +1531,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
ws.Add(watchCh)
// TODO(sean@): We could/should log an error re: the uuid_prefix lookup
// failing once a logger has been introduced to the catalog.
return true, 0, nil, nil, nil
idx := catalogNodeMaxExtinctionIndex(tx, entMeta)
return true, idx, nil, nil, nil
}

n = iter.Next()
if n == nil {
// No nodes matched, even with the Node ID: add a watch on the node name.
ws.Add(watchCh)
return true, 0, nil, nil, nil
idx := catalogNodeMaxExtinctionIndex(tx, entMeta)
return true, idx, nil, nil, nil
}

idWatchCh := iter.WatchCh()
Expand Down
15 changes: 15 additions & 0 deletions agent/consul/state/catalog_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.Ent
return nil
}

func catalogUpdateNodeExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert(tableIndex, &IndexEntry{indexNodeExtinction, idx}); err != nil {
return fmt.Errorf("failed updating missing node extinction index: %s", err)
}
return nil
}

func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
// ensure that the Partition is always clear within the state store in OSS
node.Partition = ""
Expand Down Expand Up @@ -139,6 +146,14 @@ func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *structs.EnterpriseMeta)
return maxIndexTxn(tx, nodeIndexName(nodeName, nil))
}

func catalogNodeWatchIndex(tx ReadTxn, nodeName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(tableIndex, "id", nodeIndexName(nodeName, nil))
}

func catalogNodeMaxExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, indexNodeExtinction)
}

func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, tableServices)
}
Expand Down

0 comments on commit 440db29

Please sign in to comment.