Skip to content

Commit

Permalink
GOCBC-1056 Reduce CPU usage
Browse files Browse the repository at this point in the history
Motivation
----------
After doing some performance testing of cbbackupmgr we noticed that we
were spending a significant amount of CPU time in gocbcore. We'd like to
reduce this as much as possible to free up CPU time for cbbackupmgr.

Changes
-------
1) Avoid unnecessary allocations of DCP packets/TCP write buffers
2) Avoid unnecessary branching using if statements in the golden path
   where a switch statement would suffice.
3) Rewrote the memdopmap implementation to use a map structure instead
   of a doubly linked list structure. This avoids linear traversal for
   each request which depending on the scenario could result in
   not-insignificant overhead.
4) Added the ability to disable buffer acknowledgement via a option
   using the DCP agent.
5) Use a 20MB buffered reader on each memcached connection, the same
   size as used by KV engine.

Benchmarking Setup
------------------
8GB Ephemeral bucket on a single node 7.0.0 cluster, caches on
client/server being flushed prior to each test case.

DCP tests performed using a barebones DCP drain tool which streams data
from the cluster and immediatly discards it.

Memcached tests performed using cbbackupmgr generate, results should be
taken with a grain of salt.

DCP 10,000,000 * 1B
-------------------
1) 33.4s @ 18.27MB/s (103% CPU)
2) 32.7s @ 18.62MB/s (103% CPU)
3) 31.9s @ 19.12MB/s (108% CPU)

DCP 10,000,000 * 1B (patched)
-----------------------------
1) 23.7s @ 25.73MB/s (53% CPU)
2) 23.9s @ 25.47MB/s (53% CPU)
3) 22.6s @ 26.93MB/s (57% CPU)

DCP 10,000,000 * 1B (patched + disabled buffer ack)
---------------------------------------------------
1) 23.9s @ 25.51MB/s (53% CPU)
2) 24.6s @ 24.81MB/s (55% CPU)
3) 24.6s @ 24.61MB/s (50% CPU)

DCP 5,000,000 * 1024B
---------------------
1) 18.4s @ 280.61MB/s (147% CPU)
2) 18.2s @ 283.30MB/s (149% CPU)
3) 18.3s @ 282.90MB/s (148% CPU)

DCP 5,000,000 * 1024B (patched)
-------------------------------
1) 18.5s @ 279.31MB/s (62% CPU)
2) 17.9s @ 288.25MB/s (64% CPU)
3) 18.4s @ 280.18MB/s (62% CPU)

DCP 5,000,000 * 1024B (patched + disabled buffer ack)
-----------------------------------------------------
1) 18.6s @ 277.42MB/s (58% CPU)
2) 17.5s @ 294.99MB/s (61% CPU)
3) 18.4s @ 280.39MB/s (59% CPU)

Memcached 10,000,000 * 1B
-------------------------
1) 54.7s @ 11.30MB/s (293% CPU)
2) 54.7s @ 11.30MB/s (293% CPU)
3) 55.3s @ 11.10MB/s (290% CPU)

Memcached 10,000,000 * 1B (patched)
-----------------------------------
1) 51.3s @ 11.97MB/s (197% CPU)
2) 53.2s @ 11.52MB/s (189% CPU)
3) 52.6s @ 11.74MB/s (191% CPU)

Memcached 5,000,000 * 1024B
---------------------------
1) 54.6s @ 95.90MB/s (236% CPU)
2) 51.0s @ 103.5MB/s (247% CPU)
3) 51.2s @ 101.5MB/s (248% CPU)

Memcached 5,000,000 * 1024B (patched)
-------------------------------------
1) 52.8s @ 99.59MB/s (166% CPU)
2) 49.0s @ 105.6MB/s (174% CPU)
3) 48.7s @ 107.8MB/s (172% CPU)

Change-Id: I945207f8e70cd83a3f3ed14409ae6beb82fb182c
Reviewed-on: http://review.couchbase.org/c/gocbcore/+/145436
Reviewed-by: Brett Lawson <brett19@gmail.com>
Reviewed-by: Charles Dixon <chvckd@gmail.com>
Tested-by: James Lee <james.lee@couchbase.com>
  • Loading branch information
jamesl33 committed Mar 1, 2021
1 parent 727a73f commit aa0eefe
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 342 deletions.
28 changes: 14 additions & 14 deletions collectionscomponent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsStateUnknownSuppo
binary.BigEndian.PutUint32(extras[8:], 8)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Extras: extras}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Extras: extras}}, req, nil)
})
})
dispatcher.On("RequeueDirect", mock.AnythingOfType("*gocbcore.memdQRequest"), false).Return(&memdQRequest{}, nil).
Expand All @@ -59,7 +59,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsStateUnknownSuppo
suite.Assert().Equal(uint32(8), req.CollectionID)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Value: []byte("test")}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Value: []byte("test")}}, req, nil)
})
})

Expand Down Expand Up @@ -142,7 +142,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsStateUnknownColle
suite.Assert().Equal(-1, req.ReplicaIdx)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{}}, req, errCollectionNotFound)
req.Callback(&memdQResponse{Packet: &memd.Packet{}}, req, errCollectionNotFound)
})
}).Once()
// Second request we simulate the collection coming online.
Expand All @@ -162,7 +162,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsStateUnknownColle
binary.BigEndian.PutUint32(extras[8:], 8)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Extras: extras}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Extras: extras}}, req, nil)
})
}).Once()
dispatcher.On("RequeueDirect", mock.AnythingOfType("*gocbcore.memdQRequest"), false).Return(&memdQRequest{}, nil).
Expand All @@ -177,7 +177,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsStateUnknownColle
suite.Assert().Equal(uint32(8), req.CollectionID)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Value: []byte("test")}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Value: []byte("test")}}, req, nil)
})
}).Once()

Expand Down Expand Up @@ -258,7 +258,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsStateUnknownGener
suite.Assert().Equal(-1, req.ReplicaIdx)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{}}, req, errInternalServerFailure)
req.Callback(&memdQResponse{Packet: &memd.Packet{}}, req, errInternalServerFailure)
})
}).Once()

Expand Down Expand Up @@ -462,7 +462,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
binary.BigEndian.PutUint32(extras[8:], 8)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Extras: extras}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Extras: extras}}, req, nil)
})
})
dispatcher.On("RequeueDirect", mock.AnythingOfType("*gocbcore.memdQRequest"), false).Return(&memdQRequest{}, nil).
Expand All @@ -477,7 +477,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
suite.Assert().Equal(uint32(8), req.CollectionID)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Value: []byte("test")}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Value: []byte("test")}}, req, nil)
})
})

Expand Down Expand Up @@ -553,7 +553,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
suite.Assert().Equal(-1, req.ReplicaIdx)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{}}, req, errCollectionNotFound)
req.Callback(&memdQResponse{Packet: &memd.Packet{}}, req, errCollectionNotFound)
})
}).Once()
dispatcher.On("DispatchDirect", mock.AnythingOfType("*gocbcore.memdQRequest")).Return(&memdQRequest{}, nil).
Expand All @@ -572,7 +572,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
binary.BigEndian.PutUint32(extras[8:], 8)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Extras: extras}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Extras: extras}}, req, nil)
})
}).Once()

Expand All @@ -588,7 +588,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
suite.Assert().Equal(uint32(8), req.CollectionID)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Value: []byte("test")}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Value: []byte("test")}}, req, nil)
})
}).Once()

Expand Down Expand Up @@ -667,7 +667,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
binary.BigEndian.PutUint32(extras[8:], 8)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Extras: extras}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Extras: extras}}, req, nil)
})
}).Once()
// The second request should be queued due to cid being pending so it should get requeued.
Expand All @@ -683,7 +683,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
suite.Assert().Equal(uint32(8), req.CollectionID)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Value: []byte("test")}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Value: []byte("test")}}, req, nil)
})
}).Twice()
// The third request should go straight through to Dispatch.
Expand All @@ -699,7 +699,7 @@ func (suite *UnitTestSuite) TestCollectionsComponentCollectionsSupportedCollecti
suite.Assert().Equal(uint32(8), req.CollectionID)

time.AfterFunc(time.Millisecond, func() {
req.Callback(&memdQResponse{Packet: memd.Packet{Value: []byte("test")}}, req, nil)
req.Callback(&memdQResponse{Packet: &memd.Packet{Value: []byte("test")}}, req, nil)
})
}).Once()

Expand Down
8 changes: 5 additions & 3 deletions dcpagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func CreateDcpAgent(config *DCPAgentConfig, dcpStreamName string, openFlags memd
useJSONHello := !config.DisableJSONHello
useXErrorHello := !config.DisableXErrorHello
useSyncReplicationHello := !config.DisableSyncReplicationHello
dcpBufferSize := 8 * 1024 * 1024
dcpBufferSize := 20 * 1024 * 1024
compressionMinSize := 32
compressionMinRatio := 0.83
dcpBackfillOrderStr := ""
Expand Down Expand Up @@ -177,8 +177,10 @@ func CreateDcpAgent(config *DCPAgentConfig, dcpStreamName string, openFlags memd
}
}

if err := sclient.ExecEnableDcpBufferAck(dcpBufferSize, deadline); err != nil {
return err
if !config.DisableBufferAcknowledgement {
if err := sclient.ExecEnableDcpBufferAck(dcpBufferSize, deadline); err != nil {
return err
}
}

return sclient.ExecEnableDcpClientEnd(deadline)
Expand Down
4 changes: 3 additions & 1 deletion dcpagent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ type DCPAgentConfig struct {
UseStreamID bool
UseOSOBackfill bool
BackfillOrder DCPBackfillOrder
DCPBufferSize int

DCPBufferSize int
DisableBufferAcknowledgement bool
}

func (config *DCPAgentConfig) redacted() interface{} {
Expand Down
Loading

0 comments on commit aa0eefe

Please sign in to comment.