Skip to content

Commit

Permalink
MB-31848 improve servicers throughput
Browse files Browse the repository at this point in the history
What I observed on iris is that while the throughput for Q1 is 115K,
switching to a simple 'select 1' would lower the throughput rather
than increase it.

Executing a fatal request (eg prepared statement missing) goes at 270k/sec.
I have reworked the servicers not to have channels but just a load gate (for
the ifmx people, much like PDQ gates).

If there is less that `servicers` requests running, just run the request from
the ServeHttp thread, if not, wait in a fifo queue until a slot is free.

The queue is simple for now - I want to make it better, possibly lockless, but
at this moment in time on iris I jump from 100k to 160k with this code.

Change-Id: I294449edf58e3d9418f85b185e74817f64569b67
Reviewed-on: http://review.couchbase.org/101295
Reviewed-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
Reviewed-by: Bingjie Miao <bingjie.miao@couchbase.com>
Tested-by: Marco Greco <marco.greco@couchbase.com>
  • Loading branch information
marcogrecopriolo committed Nov 3, 2018
1 parent 2fa8074 commit b445fe2
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 233 deletions.
7 changes: 1 addition & 6 deletions server/cbq-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ func main() {
prepareds.PreparedsInit(*PREPARED_LIMIT)

numProcs := runtime.GOMAXPROCS(0)
channel := make(server.RequestChannel, *REQUEST_CAP*numProcs)
plusChannel := make(server.RequestChannel, *REQUEST_CAP*numProcs)

sys, err := system.NewDatastore(datastore)
if err != nil {
Expand All @@ -216,7 +214,7 @@ func main() {
}

server, err := server.NewServer(datastore, sys, configstore, acctstore, *NAMESPACE,
*READONLY, channel, plusChannel, *SERVICERS, *PLUS_SERVICERS,
*READONLY, *REQUEST_CAP*numProcs, *REQUEST_CAP*numProcs, *SERVICERS, *PLUS_SERVICERS,
*MAX_PARALLELISM, *TIMEOUT, *SIGNATURE, *METRICS, *ENTERPRISE,
*PRETTY, prof, ctrl)
if err != nil {
Expand Down Expand Up @@ -245,9 +243,6 @@ func main() {

audit.StartAuditService(*DATASTORE, *SERVICERS+*PLUS_SERVICERS)

go server.Serve()
go server.PlusServe()

logging.Infop("cbq-engine started",
logging.Pair{"version", util.VERSION},
logging.Pair{"datastore", *DATASTORE},
Expand Down
26 changes: 8 additions & 18 deletions server/http/service_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,16 @@ func (this *HttpEndpoint) ServeHTTP(resp http.ResponseWriter, req *http.Request)
return
}

var res bool
if request.ScanConsistency() == datastore.UNBOUNDED {
select {
case this.server.Channel() <- request:

// Wait until the request exits.
request.Finished()
default:
// Buffer is full.
resp.WriteHeader(http.StatusServiceUnavailable)
}
res = this.server.ServiceRequest(request)

} else {
select {
case this.server.PlusChannel() <- request:

// Wait until the request exits.
request.Finished()
default:
// Buffer is full.
resp.WriteHeader(http.StatusServiceUnavailable)
}
res = this.server.PlusServiceRequest(request)
}

if !res {
resp.WriteHeader(http.StatusServiceUnavailable)
}
}

Expand Down
12 changes: 2 additions & 10 deletions server/http/service_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,17 +411,14 @@ func makeMockServer() *server.Server {
}

datastore.SetDatastore(store)
channel := make(server.RequestChannel, 10)
plusChannel := make(server.RequestChannel, 10)
server, err := server.NewServer(store, nil, nil, nil, "default",
false, channel, plusChannel, 4, 4, 0, 0, false, false, false, true, server.ProfOff, false)
false, 10, 10, 4, 4, 0, 0, false, false, false, true, server.ProfOff, false)
if err != nil {
logging.Errorp(err.Error())
os.Exit(1)
}
server.SetKeepAlive(1 << 10)

go server.Serve()
return server
}

Expand All @@ -431,12 +428,7 @@ func (this *testServer) testHandler() http.Handler {
if this.query_request.State() == server.FATAL {
return
}
select {
case this.query_server.Channel() <- this.query_request:
// Wait until the request exits.
this.query_request.Finished()
default:
}
this.query_server.ServiceRequest(this.query_request)
})
}

Expand Down
85 changes: 48 additions & 37 deletions server/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/couchbase/query/value"
)

type RequestChannel chan Request

type State string

const (
Expand Down Expand Up @@ -116,6 +114,10 @@ type Request interface {
GetTimings() execution.Operator
OriginalHttpRequest() *http.Request
IsAdHoc() bool

setSleep() // internal methods for load control
sleep()
release()
}

type RequestID interface {
Expand Down Expand Up @@ -195,37 +197,38 @@ type BaseRequest struct {
phaseStats [execution.PHASES]phaseStat

sync.RWMutex
id *requestIDImpl
client_id *clientContextIDImpl
statement string
prepared *plan.Prepared
reqType string
isPrepare bool
namedArgs map[string]value.Value
positionalArgs value.Values
namespace string
timeout time.Duration
timer *time.Timer
maxParallelism int
scanCap int64
pipelineCap int64
pipelineBatch int
readonly value.Tristate
signature value.Tristate
metrics value.Tristate
pretty value.Tristate
consistency ScanConfiguration
credentials auth.Credentials
remoteAddr string
userAgent string
requestTime time.Time
serviceTime time.Time
execTime time.Time
state State
aborted bool
errors []errors.Error
warnings []errors.Error
sync.WaitGroup
id *requestIDImpl
client_id *clientContextIDImpl
statement string
prepared *plan.Prepared
reqType string
isPrepare bool
namedArgs map[string]value.Value
positionalArgs value.Values
namespace string
timeout time.Duration
timer *time.Timer
maxParallelism int
scanCap int64
pipelineCap int64
pipelineBatch int
readonly value.Tristate
signature value.Tristate
metrics value.Tristate
pretty value.Tristate
consistency ScanConfiguration
credentials auth.Credentials
remoteAddr string
userAgent string
requestTime time.Time
serviceTime time.Time
execTime time.Time
state State
aborted bool
errors []errors.Error
warnings []errors.Error
results sync.WaitGroup
servicerGate sync.WaitGroup
stopResult chan bool // stop consuming results
stopExecute chan bool // stop executing request
stopOperator execution.Operator // notified when request execution stops
Expand Down Expand Up @@ -271,7 +274,7 @@ func newClientContextIDImpl(id string) *clientContextIDImpl {
func NewBaseRequest(rv *BaseRequest) {
rv.timeout = -1
rv.serviceTime = time.Now()
rv.Add(1)
rv.results.Add(1)
rv.state = RUNNING
rv.aborted = false
rv.stopResult = make(chan bool, 1)
Expand Down Expand Up @@ -745,11 +748,19 @@ func (this *BaseRequest) Stop(state State) {

// alert requestor that the request has completed
func (this *BaseRequest) Alert() {
this.Done()
this.results.Done()
}

// load control gate
func (this *BaseRequest) setSleep() {
this.servicerGate.Add(1)
}
func (this *BaseRequest) sleep() {
this.servicerGate.Wait()
}

func (this *BaseRequest) Finished() {
this.Wait()
func (this *BaseRequest) release() {
this.servicerGate.Done()
}

// this logs the request if needed and takes any other action required to
Expand Down

0 comments on commit b445fe2

Please sign in to comment.