Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fine-grained blocking queries for Catalog.NodeServices #12399

Merged
merged 4 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12399.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
catalog: Add per-node indexes to reduce watchset firing for unrelated nodes and services.
```
52 changes: 37 additions & 15 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ import (
"github.com/hashicorp/consul/types"
)

// indexServiceExtinction keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
const indexServiceExtinction = "service_last_extinction"
const (
// indexServiceExtinction keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
indexServiceExtinction = "service_last_extinction"

// indexNodeExtinction keeps track of the last raft index when the last instance
// of any node was unregistered. This is used by blocking queries on missing nodes.
indexNodeExtinction = "node_last_extinction"
)

const (
// minUUIDLookupLen is used as a minimum length of a node name required before
Expand Down Expand Up @@ -414,8 +420,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
// We are actually renaming a node, remove its reference first
err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName)
if err != nil {
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s",
node.ID, node.Address, n.Node, node.Node)
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s: %w",
node.ID, node.Address, n.Node, node.Node, err)
}
}
} else {
Expand Down Expand Up @@ -764,6 +770,15 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta
return fmt.Errorf("failed updating index: %s", err)
}

// Clean up node entry from index table
if err := tx.Delete(tableIndex, &IndexEntry{Key: nodeIndexName(nodeName, entMeta, node.PeerName)}); err != nil {
return fmt.Errorf("failed deleting nodeIndex %q: %w", nodeIndexName(nodeName, entMeta, node.PeerName), err)
}

if err := catalogUpdateNodeExtinctionIndex(tx, idx, entMeta, node.PeerName); err != nil {
return err
}

if peerName == "" {
// Invalidate any sessions for this node.
toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault())
Expand Down Expand Up @@ -1683,9 +1698,6 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}

// Get the table index.
idx := catalogMaxIndex(tx, entMeta, peerName, false)

// Query the node by node name
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
Value: nodeNameOrID,
Expand All @@ -1712,16 +1724,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
})
if err != nil {
ws.Add(watchCh)
// TODO(sean@): We could/should log an error re: the uuid_prefix lookup
// failing once a logger has been introduced to the catalog.
return true, 0, nil, nil, nil
idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
return true, idx, nil, nil, nil
}

n = iter.Next()
if n == nil {
// No nodes matched, even with the Node ID: add a watch on the node name.
ws.Add(watchCh)
return true, 0, nil, nil, nil
idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
return true, idx, nil, nil, nil
}

idWatchCh := iter.WatchCh()
Expand All @@ -1745,6 +1757,9 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
}
ws.Add(services.WatchCh())

// Get the table index.
idx := catalogNodeMaxIndex(tx, nodeName, entMeta, peerName)

return false, idx, node, services, nil
}

Expand Down Expand Up @@ -1902,10 +1917,17 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st

svc := service.(*structs.ServiceNode)
if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating services indexes: %w", err)
}
if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating service-kind indexes: %w", err)
}
// Update the node indexes as the service information is included in node catalog queries.
if err := catalogUpdateNodesIndexes(tx, idx, entMeta, peerName); err != nil {
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, peerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
}

name := svc.CompoundServiceName()
Expand All @@ -1930,7 +1952,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName)
if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it
if errW := tx.Delete(tableIndex, serviceIndex); errW != nil {
if err := tx.Delete(tableIndex, serviceIndex); err != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
}
}
Expand Down
60 changes: 47 additions & 13 deletions agent/consul/state/catalog_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"strings"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -24,8 +24,12 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *acl.EnterpriseMeta, peerN
return peeredIndexEntryName(base, peerName)
}

func nodeIndexName(name string, _ *acl.EnterpriseMeta, peerName string) string {
return peeredIndexEntryName(fmt.Sprintf("node.%s", name), peerName)
}

func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// overall nodes index
// overall nodes index for snapshot and ListNodes RPC
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
Expand All @@ -38,12 +42,22 @@ func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, p
return nil
}

// catalogUpdateNodeIndexes upserts the max index for a single node
func catalogUpdateNodeIndexes(tx WriteTxn, idx uint64, nodeName string, _ *acl.EnterpriseMeta, peerName string) error {
// per-node index
if err := indexUpdateMaxTxn(tx, idx, nodeIndexName(nodeName, nil, peerName)); err != nil {
return fmt.Errorf("failed updating node index: %w", err)
}

return nil
}

// catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels
// of granularity (no-op if `idx` is lower than what exists for that index key):
// - all services
// - all services in a specified peer (including internal)
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// overall services index
// overall services index for snapshot
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
return fmt.Errorf("failed updating index for services table: %w", err)
}
Expand Down Expand Up @@ -84,14 +98,16 @@ func catalogUpdateServiceIndexes(tx WriteTxn, idx uint64, serviceName string, _
}

func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
if err := indexUpdateMaxTxn(tx, idx, indexServiceExtinction); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %w", err)
}
// update the peer index
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil {
return fmt.Errorf("failed updating missing service extinction peered index: %w", err)
}
return nil
}

func catalogUpdateNodeExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexNodeExtinction, peerName)); err != nil {
return fmt.Errorf("failed updating missing node extinction peered index: %w", err)
}
return nil
}

Expand All @@ -105,7 +121,10 @@ func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
}

if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil {
return err
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta(), node.PeerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
}

// Update the node's service indexes as the node information is included
Expand All @@ -125,15 +144,23 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
}

if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating services indexes: %w", err)
}

if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating service indexes: %w", err)
}

if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating service-kind indexes: %w", err)
}

// Update the node indexes as the service information is included in node catalog queries.
if err := catalogUpdateNodesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, svc.ModifyIndex, svc.Node, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
}

return nil
Expand All @@ -143,6 +170,14 @@ func catalogNodesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) ui
return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName))
}

func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, nodeIndexName(nodeName, nil, peerName))
}

func catalogNodeLastExtinctionIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, peeredIndexEntryName(indexNodeExtinction, peerName))
}

func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName))
}
Expand Down Expand Up @@ -185,7 +220,6 @@ func catalogMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string, checks
}

func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 {
// TODO(peering_indexes): pipe peerName here
if checks {
return maxIndexWatchTxn(tx, ws,
peeredIndexEntryName(tableChecks, peerName),
Expand All @@ -200,7 +234,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta,
}

func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// update the universal index entry
// update the overall index entry for snapshot
if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
Expand Down
Loading