Skip to content

Commit

Permalink
GOCBC-427: Change context to cover the whole stream for services
Browse files Browse the repository at this point in the history
Motivation
----------
At present context is only used for timing out the initial
connection for a service http request. Instead the context should
last the lifetime of the stream.

Changes
-------
Change context creation so that if the user sets a context and
its deadline is before whatever timeout they set then we don’t
touch it and allow it to hard close the connection. If they don’t
(or timeout is shorter) then the context that we create we add 1s to.
Also detect timeout errors on Close.

Change-Id: Iaee0c397e3eed18306c42f48488396ad3be46b65
Reviewed-on: http://review.couchbase.org/107742
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Charles Dixon <chvckd@gmail.com>
  • Loading branch information
chvck committed Apr 15, 2019
1 parent 95c8f73 commit 4a9cd52
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 121 deletions.
29 changes: 5 additions & 24 deletions bucket_viewquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ViewResults struct {
errReason string
errMessage string

cancel context.CancelFunc
ctx context.Context
streamResult *streamingResult
strace opentracing.Span
err error
Expand Down Expand Up @@ -82,8 +82,8 @@ func (r *ViewResults) Close() error {
if r.strace != nil {
r.strace.Finish()
}
if r.cancel != nil {
r.cancel()
if r.ctx.Err() == context.DeadlineExceeded {
return timeoutError{}
}
if vErr := r.makeError(); vErr != nil {
return vErr
Expand Down Expand Up @@ -201,16 +201,6 @@ func (r *ViewResultsMetadata) TotalRows() int {
return r.totalRows
}

func (b *Bucket) runContextTimeout(ctx context.Context, reqCancel context.CancelFunc, doneChan chan struct{}) {
select {
case <-ctx.Done():
reqCancel()
<-doneChan
case <-doneChan:

}
}

// ViewQuery performs a view query and returns a list of rows or an error.
func (b *Bucket) ViewQuery(designDoc string, viewName string, opts *ViewOptions) (*ViewResults, error) {
if opts == nil {
Expand Down Expand Up @@ -286,17 +276,12 @@ func (b *Bucket) SpatialViewQuery(designDoc string, viewName string, opts *Spati
func (b *Bucket) executeViewQuery(ctx context.Context, traceCtx opentracing.SpanContext, viewType, ddoc, viewName string,
options url.Values, provider httpProvider) (*ViewResults, error) {

// we only want ctx to timeout the initial connection rather than the stream
reqCtx, reqCancel := context.WithCancel(context.Background())
doneChan := make(chan struct{})
go b.runContextTimeout(ctx, reqCancel, doneChan)

reqUri := fmt.Sprintf("/_design/%s/%s/%s?%s", ddoc, viewType, viewName, options.Encode())
req := &gocbcore.HttpRequest{
Service: gocbcore.CapiService,
Path: reqUri,
Method: "GET",
Context: reqCtx,
Context: ctx,
}

dtrace := opentracing.GlobalTracer().StartSpan("dispatch", opentracing.ChildOf(traceCtx))
Expand Down Expand Up @@ -324,7 +309,6 @@ func (b *Bucket) executeViewQuery(ctx context.Context, traceCtx opentracing.Span

if resp.StatusCode == 500 {
// We have to handle the views 500 case as a special case because the body can be of form [] or {}
defer reqCancel()
defer strace.Finish()
defer func() {
err := resp.Body.Close()
Expand Down Expand Up @@ -372,7 +356,6 @@ func (b *Bucket) executeViewQuery(ctx context.Context, traceCtx opentracing.Span

streamResult, err := newStreamingResults(resp.Body, queryResults.readAttribute)
if err != nil {
reqCancel()
strace.Finish()
return nil, err
}
Expand All @@ -383,7 +366,6 @@ func (b *Bucket) executeViewQuery(ctx context.Context, traceCtx opentracing.Span
if bodyErr != nil {
logDebugf("Failed to close socket (%s)", bodyErr.Error())
}
reqCancel()
strace.Finish()
return nil, err
}
Expand All @@ -392,13 +374,12 @@ func (b *Bucket) executeViewQuery(ctx context.Context, traceCtx opentracing.Span

if streamResult.HasRows() {
queryResults.strace = strace
queryResults.cancel = reqCancel
queryResults.ctx = ctx
} else {
bodyErr := streamResult.Close()
if bodyErr != nil {
logDebugf("Failed to close response body, %s", bodyErr.Error())
}
reqCancel()
strace.Finish()

// There are no rows and there are errors so fast fail
Expand Down
38 changes: 21 additions & 17 deletions cluster_analyticsquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type AnalyticsResults struct {
cancel context.CancelFunc
strace opentracing.Span
httpProvider httpProvider
ctx context.Context
}

// Next assigns the next result from the results into the value pointer, returning whether the read was successful.
Expand Down Expand Up @@ -123,9 +124,13 @@ func (r *AnalyticsResults) Close() error {
if r.strace != nil {
r.strace.Finish()
}
ctxErr := r.ctx.Err()
if r.cancel != nil {
r.cancel()
}
if ctxErr == context.DeadlineExceeded {
return timeoutError{}
}
if r.err != nil {
return r.err
}
Expand Down Expand Up @@ -362,10 +367,13 @@ func (c *Cluster) analyticsQuery(ctx context.Context, traceCtx opentracing.SpanC
now := time.Now()
d, ok := ctx.Deadline()

if !ok || now.Add(timeout).Before(d) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
// If we don't need to then we don't touch the original ctx value so that the Done channel is set
// in a predictable manner. If we create a context then we need to create it with timeout + 1 second
// so that the server closes the connection rather than us. This is just a better user experience.
timeoutPlusBuffer := timeout + time.Second
var cancel context.CancelFunc
if !ok || now.Add(timeoutPlusBuffer).Before(d) {
ctx, cancel = context.WithTimeout(ctx, timeoutPlusBuffer)
} else {
timeout = d.Sub(now)
}
Expand All @@ -379,7 +387,7 @@ func (c *Cluster) analyticsQuery(ctx context.Context, traceCtx opentracing.SpanC
for {
retries++
etrace := opentracing.GlobalTracer().StartSpan("execute", opentracing.ChildOf(traceCtx))
res, err = c.executeAnalyticsQuery(ctx, traceCtx, queryOpts, provider)
res, err = c.executeAnalyticsQuery(ctx, traceCtx, queryOpts, provider, cancel)
etrace.Finish()
if err == nil {
break
Expand All @@ -393,14 +401,18 @@ func (c *Cluster) analyticsQuery(ctx context.Context, traceCtx opentracing.SpanC
}

if err != nil {
// only cancel on error, if we cancel when things have gone to plan then we'll prematurely close the stream
if cancel != nil {
cancel()
}
return nil, err
}

return res, nil
}

func (c *Cluster) executeAnalyticsQuery(ctx context.Context, traceCtx opentracing.SpanContext, opts map[string]interface{},
provider httpProvider) (*AnalyticsResults, error) {
provider httpProvider, cancel context.CancelFunc) (*AnalyticsResults, error) {

// priority is sent as a header not in the body
priority, priorityCastOK := opts["priority"].(int)
Expand All @@ -413,16 +425,11 @@ func (c *Cluster) executeAnalyticsQuery(ctx context.Context, traceCtx opentracin
return nil, errors.Wrap(err, "failed to marshal query request body")
}

// we only want ctx to timeout the initial connection rather than the stream
reqCtx, reqCancel := context.WithCancel(context.Background())
doneChan := make(chan struct{})
go c.runContextTimeout(ctx, reqCancel, doneChan)

req := &gocbcore.HttpRequest{
Service: gocbcore.CbasService,
Path: "/analytics/service",
Method: "POST",
Context: reqCtx,
Context: ctx,
Body: reqJSON,
}

Expand All @@ -434,7 +441,6 @@ func (c *Cluster) executeAnalyticsQuery(ctx context.Context, traceCtx opentracin
dtrace := opentracing.GlobalTracer().StartSpan("dispatch", opentracing.ChildOf(traceCtx))

resp, err := provider.DoHttpRequest(req)
doneChan <- struct{}{}
if err != nil {
dtrace.Finish()
if err == gocbcore.ErrNoCbasService {
Expand Down Expand Up @@ -471,7 +477,6 @@ func (c *Cluster) executeAnalyticsQuery(ctx context.Context, traceCtx opentracin

streamResult, err := newStreamingResults(resp.Body, queryResults.readAttribute)
if err != nil {
reqCancel()
strace.Finish()
return nil, err
}
Expand All @@ -482,7 +487,6 @@ func (c *Cluster) executeAnalyticsQuery(ctx context.Context, traceCtx opentracin
if bodyErr != nil {
logDebugf("Failed to close socket (%s)", bodyErr.Error())
}
reqCancel()
strace.Finish()
return nil, err
}
Expand All @@ -493,13 +497,13 @@ func (c *Cluster) executeAnalyticsQuery(ctx context.Context, traceCtx opentracin

if streamResult.HasRows() {
queryResults.strace = strace
queryResults.cancel = reqCancel
queryResults.cancel = cancel
queryResults.ctx = ctx
} else {
bodyErr := streamResult.Close()
if bodyErr != nil {
logDebugf("Failed to close response body, %s", bodyErr.Error())
}
reqCancel()
strace.Finish()

// There are no rows and there are errors so fast fail
Expand Down
81 changes: 64 additions & 17 deletions cluster_analyticsquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,7 @@ func TestAnalyticsQuery(t *testing.T) {
func testCreateAnalyticsDataset(t *testing.T) {
// _p/cbas-admin/analytics/node/agg/stats/remaining
query := "CREATE DATASET `travel-sample` ON `travel-sample`;"
res, err := globalCluster.AnalyticsQuery(query, nil)
if err != nil {
t.Fatalf("Failed to create dataset %v", err)
}

// assume there aren't any rows, there shouldn't be any rows
err = res.Close()
_, err := globalCluster.AnalyticsQuery(query, nil)
if err != nil {
aErrs, ok := err.(AnalyticsQueryErrors)
if !ok {
Expand All @@ -74,15 +68,10 @@ func testCreateAnalyticsDataset(t *testing.T) {
}

query = "CONNECT LINK Local;"
res, err = globalCluster.AnalyticsQuery(query, nil)
_, err = globalCluster.AnalyticsQuery(query, nil)
if err != nil {
t.Fatalf("Failed to connect link %v", err)
}

err = res.Close()
if err != nil {
t.Fatalf("Failed to connect link: %v", err)
}
}

func testWaitAnalyticsDataset(errCh chan error) {
Expand Down Expand Up @@ -394,7 +383,7 @@ func TestAnalyticsQueryServiceNotFound(t *testing.T) {
}
}

func TestAnalyticsQueryConnectTimeout(t *testing.T) {
func TestAnalyticsQueryClientSideTimeout(t *testing.T) {
statement := "select `beer-sample`.* from `beer-sample` WHERE `type` = ? ORDER BY brewery_id, name"
timeout := 20 * time.Millisecond
clusterTimeout := 50 * time.Second
Expand Down Expand Up @@ -427,7 +416,7 @@ func TestAnalyticsQueryConnectTimeout(t *testing.T) {
// we can't use time travel here as we need the context to actually timeout
time.Sleep(100 * time.Millisecond)

return nil, context.Canceled
return nil, context.DeadlineExceeded
}

provider := &mockHTTPProvider{
Expand All @@ -445,6 +434,64 @@ func TestAnalyticsQueryConnectTimeout(t *testing.T) {
}
}

func TestAnalyticsQueryStreamTimeout(t *testing.T) {
dataBytes, err := loadRawTestDataset("analytics_timeout")
if err != nil {
t.Fatalf("Could not read test dataset: %v", err)
}

statement := "select `beer-sample`.* from `beer-sample` WHERE `type` = ? ORDER BY brewery_id, name"
timeout := 20 * time.Millisecond
clusterTimeout := 50 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

doHTTP := func(req *gocbcore.HttpRequest) (*gocbcore.HttpResponse, error) {
testAssertAnalyticsQueryRequest(t, req)

var opts map[string]interface{}
err := json.Unmarshal(req.Body, &opts)
if err != nil {
t.Fatalf("Failed to unmarshal request body %v", err)
}

optsTimeout, ok := opts["timeout"]
if !ok {
t.Fatalf("Request query options missing timeout")
}

dur, err := time.ParseDuration(optsTimeout.(string))
if err != nil {
t.Fatalf("Could not parse timeout: %v", err)
}

if dur < (timeout-50*time.Millisecond) || dur > (timeout+50*time.Millisecond) {
t.Fatalf("Expected timeout to be %s but was %s", timeout.String(), optsTimeout)
}

resp := &gocbcore.HttpResponse{
StatusCode: 200,
Body: &testReadCloser{bytes.NewBuffer(dataBytes), nil},
}

return resp, nil
}

provider := &mockHTTPProvider{
doFn: doHTTP,
}

cluster := testGetClusterForHTTP(provider, clusterTimeout, 0, 0)

_, err = cluster.AnalyticsQuery(statement, &AnalyticsQueryOptions{
ServerSideTimeout: timeout,
Context: ctx,
})
if err == nil || !IsTimeoutError(err) {
t.Fatalf("Error should have been timeout but was %v", err)
}
}

func TestAnalyticsQueryConnectContextTimeout(t *testing.T) {
statement := "select `beer-sample`.* from `beer-sample` WHERE `type` = ? ORDER BY brewery_id, name"
timeout := 50 * time.Second
Expand Down Expand Up @@ -479,7 +526,7 @@ func TestAnalyticsQueryConnectContextTimeout(t *testing.T) {
// we can't use time travel here as we need the context to actually timeout
time.Sleep(100 * time.Millisecond)

return nil, context.Canceled
return nil, context.DeadlineExceeded
}

provider := &mockHTTPProvider{
Expand Down Expand Up @@ -530,7 +577,7 @@ func TestAnalyticsQueryConnectClusterTimeout(t *testing.T) {
// we can't use time travel here as we need the context to actually timeout
time.Sleep(100 * time.Millisecond)

return nil, context.Canceled
return nil, context.DeadlineExceeded
}

provider := &mockHTTPProvider{
Expand Down
Loading

0 comments on commit 4a9cd52

Please sign in to comment.