Skip to content

Commit

Permalink
Add per-node max indexes (#12399)
Browse files Browse the repository at this point in the history
Adds fine-grained node.[node] entries to the index table, allowing blocking queries to return fine-grained indexes that prevent them from returning immediately when unrelated nodes/services are updated.

Co-authored-by: kisunji <ckim@hashicorp.com>
  • Loading branch information
wjordan and kisunji committed Jun 23, 2022
1 parent ba89a7d commit 34ecbc1
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 85 deletions.
3 changes: 3 additions & 0 deletions .changelog/12399.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
catalog: Add per-node indexes to reduce watchset firing for unrelated nodes and services.
```
52 changes: 37 additions & 15 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ import (
"github.com/hashicorp/consul/types"
)

// indexServiceExtinction keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
const indexServiceExtinction = "service_last_extinction"
const (
// indexServiceExtinction keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
indexServiceExtinction = "service_last_extinction"

// indexNodeExtinction keeps track of the last raft index when the last instance
// of any node was unregistered. This is used by blocking queries on missing nodes.
indexNodeExtinction = "node_last_extinction"
)

const (
// minUUIDLookupLen is used as a minimum length of a node name required before
Expand Down Expand Up @@ -414,8 +420,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
// We are actually renaming a node, remove its reference first
err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName)
if err != nil {
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s",
node.ID, node.Address, n.Node, node.Node)
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s: %w",
node.ID, node.Address, n.Node, node.Node, err)
}
}
} else {
Expand Down Expand Up @@ -764,6 +770,15 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta
return fmt.Errorf("failed updating index: %s", err)
}

// Clean up node entry from index table
if err := tx.Delete(tableIndex, &IndexEntry{Key: nodeIndexName(nodeName, entMeta, node.PeerName)}); err != nil {
return fmt.Errorf("failed deleting nodeIndex %q: %w", nodeIndexName(nodeName, entMeta, node.PeerName), err)
}

if err := catalogUpdateNodeExtinctionIndex(tx, idx, entMeta, node.PeerName); err != nil {
return err
}

if peerName == "" {
// Invalidate any sessions for this node.
toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault())
Expand Down Expand Up @@ -1683,9 +1698,6 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}

// Get the table index.
idx := catalogMaxIndex(tx, entMeta, peerName, false)

// Query the node by node name
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
Value: nodeNameOrID,
Expand All @@ -1712,16 +1724,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
})
if err != nil {
ws.Add(watchCh)
// TODO(sean@): We could/should log an error re: the uuid_prefix lookup
// failing once a logger has been introduced to the catalog.
return true, 0, nil, nil, nil
idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
return true, idx, nil, nil, nil
}

n = iter.Next()
if n == nil {
// No nodes matched, even with the Node ID: add a watch on the node name.
ws.Add(watchCh)
return true, 0, nil, nil, nil
idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
return true, idx, nil, nil, nil
}

idWatchCh := iter.WatchCh()
Expand All @@ -1745,6 +1757,9 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
}
ws.Add(services.WatchCh())

// Get the table index.
idx := catalogNodeMaxIndex(tx, nodeName, entMeta, peerName)

return false, idx, node, services, nil
}

Expand Down Expand Up @@ -1902,10 +1917,17 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st

svc := service.(*structs.ServiceNode)
if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating services indexes: %w", err)
}
if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating service-kind indexes: %w", err)
}
// Update the node indexes as the service information is included in node catalog queries.
if err := catalogUpdateNodesIndexes(tx, idx, entMeta, peerName); err != nil {
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, peerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
}

name := svc.CompoundServiceName()
Expand All @@ -1930,7 +1952,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName)
if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it
if errW := tx.Delete(tableIndex, serviceIndex); errW != nil {
if err := tx.Delete(tableIndex, serviceIndex); err != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
}
}
Expand Down
60 changes: 47 additions & 13 deletions agent/consul/state/catalog_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"strings"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -24,8 +24,12 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *acl.EnterpriseMeta, peerN
return peeredIndexEntryName(base, peerName)
}

func nodeIndexName(name string, _ *acl.EnterpriseMeta, peerName string) string {
return peeredIndexEntryName(fmt.Sprintf("node.%s", name), peerName)
}

func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// overall nodes index
// overall nodes index for snapshot and ListNodes RPC
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
Expand All @@ -38,12 +42,22 @@ func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, p
return nil
}

// catalogUpdateNodeIndexes upserts the max index for a single node
func catalogUpdateNodeIndexes(tx WriteTxn, idx uint64, nodeName string, _ *acl.EnterpriseMeta, peerName string) error {
// per-node index
if err := indexUpdateMaxTxn(tx, idx, nodeIndexName(nodeName, nil, peerName)); err != nil {
return fmt.Errorf("failed updating node index: %w", err)
}

return nil
}

// catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels
// of granularity (no-op if `idx` is lower than what exists for that index key):
// - all services
// - all services in a specified peer (including internal)
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// overall services index
// overall services index for snapshot
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
return fmt.Errorf("failed updating index for services table: %w", err)
}
Expand Down Expand Up @@ -84,14 +98,16 @@ func catalogUpdateServiceIndexes(tx WriteTxn, idx uint64, serviceName string, _
}

func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
if err := indexUpdateMaxTxn(tx, idx, indexServiceExtinction); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %w", err)
}
// update the peer index
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil {
return fmt.Errorf("failed updating missing service extinction peered index: %w", err)
}
return nil
}

func catalogUpdateNodeExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexNodeExtinction, peerName)); err != nil {
return fmt.Errorf("failed updating missing node extinction peered index: %w", err)
}
return nil
}

Expand All @@ -105,7 +121,10 @@ func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
}

if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil {
return err
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta(), node.PeerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
}

// Update the node's service indexes as the node information is included
Expand All @@ -125,15 +144,23 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
}

if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating services indexes: %w", err)
}

if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating service indexes: %w", err)
}

if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err
return fmt.Errorf("failed updating service-kind indexes: %w", err)
}

// Update the node indexes as the service information is included in node catalog queries.
if err := catalogUpdateNodesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, svc.ModifyIndex, svc.Node, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
}

return nil
Expand All @@ -143,6 +170,14 @@ func catalogNodesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) ui
return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName))
}

func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, nodeIndexName(nodeName, nil, peerName))
}

func catalogNodeLastExtinctionIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, peeredIndexEntryName(indexNodeExtinction, peerName))
}

func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName))
}
Expand Down Expand Up @@ -185,7 +220,6 @@ func catalogMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string, checks
}

func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 {
// TODO(peering_indexes): pipe peerName here
if checks {
return maxIndexWatchTxn(tx, ws,
peeredIndexEntryName(tableChecks, peerName),
Expand All @@ -200,7 +234,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta,
}

func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// update the universal index entry
// update the overall index entry for snapshot
if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
Expand Down
Loading

0 comments on commit 34ecbc1

Please sign in to comment.