Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Performance On Large clusters] Reduce updates on large services #4720

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 48 additions & 15 deletions agent/consul/state/catalog.go
Expand Up @@ -430,6 +430,11 @@ func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) err
// Get the indexes.
if n != nil {
node.CreateIndex = n.CreateIndex
node.ModifyIndex = n.ModifyIndex
// We do not need to update anything
if node.IsSame(n) {
return nil
}
node.ModifyIndex = idx
} else {
node.CreateIndex = idx
Expand Down Expand Up @@ -687,9 +692,16 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
// conversion doesn't populate any of the node-specific information.
// That's always populated when we read from the state store.
entry := svc.ToServiceNode(node)
modified := true
if existing != nil {
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
serviceNode := existing.(*structs.ServiceNode)
entry.CreateIndex = serviceNode.CreateIndex
entry.ModifyIndex = serviceNode.ModifyIndex
if entry.IsSame(serviceNode) {
modified = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there is a reason why we cannot just return here like you did for the IsSame check on the node. If so it would be good to have a comment indicating why you can't, why the node needs to still be Inserted but why the "index" table doesn't require updating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, was just to avoid to change too many tests.
Another possible reason was adding into Consul some values in DB, for instance pre-filled values from ToServiceNodes(), for instance Weights.
While semantics to not change, we are sure we are keeping the same kind of data before and after the patch (while upgrade semantics stays untouched).

Added comments in next patch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the default values coming from ToServiceNode, that call happens prior to the IsSame check so if any of the defaults change then IsSame should return false and cause reinsertion.

As for the other reason of wanting to return ErrMissingNode, why can't we just move the node check block to before the ToServiceNode call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I wanted to have a change as small as possible, but you are definitely right, changing that...

} else {
entry.ModifyIndex = idx
}
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
Expand All @@ -708,11 +720,13 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("services", entry); err != nil {
return fmt.Errorf("failed inserting service: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
if modified {
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}

return nil
Expand Down Expand Up @@ -1236,8 +1250,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec

// Set the indexes
if existing != nil {
hc.CreateIndex = existing.(*structs.HealthCheck).CreateIndex
hc.ModifyIndex = idx
existingCheck := existing.(*structs.HealthCheck)
hc.CreateIndex = existingCheck.CreateIndex
hc.ModifyIndex = existingCheck.ModifyIndex
} else {
hc.CreateIndex = idx
hc.ModifyIndex = idx
Expand All @@ -1257,6 +1272,7 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
return ErrMissingNode
}

modified := true
// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
Expand All @@ -1272,14 +1288,24 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
modified = false
} else {
// Check has been modified, we trigger a index service change
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
} else {
// Update the status for all the services associated with this node
err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil {
return err
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
modified = false
} else {
// Since the check has been modified, it impacts all services of node
// Update the status for all the services associated with this node
err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil {
return err
}
}
}

Expand All @@ -1303,6 +1329,13 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
}
}
}
if modified {
// We update the modify index, ONLY if something has changed, thus
// With constant output, no change is seen when watching a service
// With huge number of nodes where anti-entropy updates continuously
// the checks, but not the values within the check
hc.ModifyIndex = idx
}

// Persist the check registration in the db.
if err := tx.Insert("checks", hc); err != nil {
Expand Down
117 changes: 87 additions & 30 deletions agent/consul/state/catalog_test.go
Expand Up @@ -294,7 +294,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
CheckID: "check1",
Name: "check",
Status: "critical",
RaftIndex: structs.RaftIndex{CreateIndex: 3, ModifyIndex: 4},
RaftIndex: structs.RaftIndex{CreateIndex: 3, ModifyIndex: 3},
},
&structs.HealthCheck{
Node: "node1",
Expand Down Expand Up @@ -491,8 +491,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
}
c1 := out[0]
if c1.Node != nodeName || c1.CheckID != "check1" || c1.Name != "check" ||
c1.CreateIndex != 3 || c1.ModifyIndex != 4 {
t.Fatalf("bad check returned: %#v", c1)
c1.CreateIndex != 3 || c1.ModifyIndex != 3 {
t.Fatalf("bad check returned, should not be modified: %#v", c1)
}

c2 := out[1]
Expand All @@ -508,6 +508,9 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
in := &structs.Node{
Node: nodeName,
Address: "1.1.1.9",
Meta: map[string]string{
"version": string(txIdx),
},
}
if err := s.EnsureNode(txIdx, in); err != nil {
t.Fatalf("err: %s", err)
Expand All @@ -517,10 +520,10 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
t.Fatalf("err: %s", err)
}
if idx != txIdx {
t.Fatalf("index should be %q, was: %q", txIdx, idx)
t.Fatalf("index should be %v, was: %v", txIdx, idx)
}
if out.Node != nodeName {
t.Fatalf("unexpected result out = %q, nodeName supposed to be %s", out, nodeName)
t.Fatalf("unexpected result out = %v, nodeName supposed to be %s", out, nodeName)
}
}

Expand Down Expand Up @@ -726,8 +729,12 @@ func TestStateStore_EnsureNode(t *testing.T) {
}

// Update the node registration
in.Address = "1.1.1.2"
if err := s.EnsureNode(2, in); err != nil {
in2 := &structs.Node{
ID: in.ID,
Node: in.Node,
Address: "1.1.1.2",
}
if err := s.EnsureNode(2, in2); err != nil {
t.Fatalf("err: %s", err)
}

Expand All @@ -745,15 +752,32 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}

// Re-inserting data should not modify ModifiedIndex
if err := s.EnsureNode(3, in2); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" {
t.Fatalf("node was modified: %#v", out)
}

// Node upsert preserves the create index
if err := s.EnsureNode(3, in); err != nil {
in3 := &structs.Node{
ID: in.ID,
Node: in.Node,
Address: "1.1.1.3",
}
if err := s.EnsureNode(3, in3); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.2" {
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.3" {
t.Fatalf("node was modified: %#v", out)
}
if idx != 3 {
Expand Down Expand Up @@ -2177,32 +2201,60 @@ func TestStateStore_EnsureCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks[0])
}

// Modify the health check
check.Output = "bbb"
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %s", err)
}
testCheckOutput := func(expectedNodeIndex, expectedIndexForCheck uint64, outputTxt string) {
// Check that we successfully updated
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expectedNodeIndex {
t.Fatalf("bad index: %d", idx)
}

// Check that we successfully updated
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks))
}
if checks[0].Output != outputTxt {
t.Fatalf("wrong check output: %#v", checks[0])
}
if checks[0].CreateIndex != 3 || checks[0].ModifyIndex != expectedIndexForCheck {
t.Fatalf("bad index: %#v, expectedIndexForCheck:=%v ", checks[0], expectedIndexForCheck)
}
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
// Do not really modify the health check content the health check
check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "redis check",
Status: api.HealthPassing,
Notes: "test check",
Output: "aaa",
ServiceID: "service1",
ServiceName: "redis",
}
if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks))
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %s", err)
}
if checks[0].Output != "bbb" {
t.Fatalf("wrong check output: %#v", checks[0])
testCheckOutput(4, 3, check.Output)

// Do modify the heathcheck
check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "redis check",
Status: api.HealthPassing,
Notes: "test check",
Output: "bbbmodified",
ServiceID: "service1",
ServiceName: "redis",
}
if checks[0].CreateIndex != 3 || checks[0].ModifyIndex != 4 {
t.Fatalf("bad index: %#v", checks[0])
if err := s.EnsureCheck(5, check); err != nil {
t.Fatalf("err: %s", err)
}
testCheckOutput(5, 5, "bbbmodified")

// Index tables were updated
if idx := s.maxIndex("checks"); idx != 4 {
if idx := s.maxIndex("checks"); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}
Expand Down Expand Up @@ -2890,7 +2942,7 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
}

// Node updates alter the returned index and fire the watch.
testRegisterNode(t, s, 8, "node1")
testRegisterNodeWithChange(t, s, 8, "node1")
if !watchFired(ws) {
t.Fatalf("bad")
}
Expand All @@ -2905,7 +2957,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
}

// Service updates alter the returned index and fire the watch.
testRegisterService(t, s, 9, "node1", "service1")

testRegisterServiceWithChange(t, s, 9, "node1", "service1", true)
if !watchFired(ws) {
t.Fatalf("bad")
}
Expand Down Expand Up @@ -3261,6 +3314,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
ID: "service1",
Service: "service1",
Address: "1.1.1.1",
Meta: make(map[string]string),
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
Expand All @@ -3272,6 +3326,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
ID: "service2",
Service: "service2",
Address: "1.1.1.1",
Meta: make(map[string]string),
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
Expand Down Expand Up @@ -3313,6 +3368,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service1",
Address: "1.1.1.1",
Port: 1111,
Meta: make(map[string]string),
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
Expand All @@ -3324,6 +3380,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service2",
Address: "1.1.1.1",
Port: 1111,
Meta: make(map[string]string),
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
Expand All @@ -3344,7 +3401,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
if len(dump) != 1 || !reflect.DeepEqual(dump[0], expect[0]) {
t.Fatalf("bad: %#v", dump)
t.Fatalf("bad: len=%#v dump=%#v expect=%#v", len(dump), dump[0], expect[0])
}

// Generate a dump of all the nodes
Expand Down
25 changes: 24 additions & 1 deletion agent/consul/state/state_store_test.go
Expand Up @@ -40,6 +40,13 @@ func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
}

// testRegisterNodeWithChange registers a node and ensures it gets different from previous registration
func testRegisterNodeWithChange(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeWithMeta(t, s, idx, nodeID, map[string]string{
"version": string(idx),
})
}

func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) {
node := &structs.Node{Node: nodeID, Meta: meta}
if err := s.EnsureNode(idx, node); err != nil {
Expand All @@ -57,12 +64,20 @@ func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string,
}
}

func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated
// even if service already exists if using `modifyAccordingIndex`.
// This is done by setting the transation ID in "version" meta so service will be updated if it already exists
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) {
meta := make(map[string]string)
if modifyAccordingIndex {
meta["version"] = string(idx)
}
svc := &structs.NodeService{
ID: serviceID,
Service: serviceID,
Address: "1.1.1.1",
Port: 1111,
Meta: meta,
}
if err := s.EnsureService(idx, nodeID, svc); err != nil {
t.Fatalf("err: %s", err)
Expand All @@ -81,6 +96,14 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s
}
}

// testRegisterService register a service with given transation idx
// If the service already exists, transaction number might not be increased
// Use `testRegisterServiceWithChange()` if you want perform a registration that
// ensures the transaction is updated by setting idx in Meta of Service
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
}

func testRegisterCheck(t *testing.T, s *Store, idx uint64,
nodeID string, serviceID string, checkID types.CheckID, state string) {
chk := &structs.HealthCheck{
Expand Down