Skip to content

Commit

Permalink
GOCBC-418: Add http streaming to analytics querying
Browse files Browse the repository at this point in the history
Motivation
----------
Some queries can return very large resultsets. At present, we read
this entire result into memory which could lead to memory issues.
Instead, we should stream the result from the server.

Changes
-------
Stream results from the server whilst not changing the public API.

Change-Id: I6af82ccac42d35c25e76b3c2034e08321f5fbbb3
Reviewed-on: http://review.couchbase.org/107079
Reviewed-by: Mike Goldsmith <goldsmith.mike@gmail.com>
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Charles Dixon <chvckd@gmail.com>
  • Loading branch information
chvck committed Apr 9, 2019
1 parent b9b06cd commit 44ba6f2
Show file tree
Hide file tree
Showing 4 changed files with 823 additions and 155 deletions.
115 changes: 82 additions & 33 deletions analyticsquery_deferred.go
Expand Up @@ -2,35 +2,34 @@ package gocb

import (
"encoding/json"
"fmt"
"io"
"time"

gocbcore "github.com/couchbase/gocbcore/v8"
"github.com/couchbase/gocbcore/v8"
"github.com/pkg/errors"
)

// AnalyticsDeferredResultHandle allows access to the handle of a deferred Analytics query.
//
// Experimental: This API is subject to change at any time.
type AnalyticsDeferredResultHandle interface {
One(valuePtr interface{}) error
Next(valuePtr interface{}) bool
NextBytes() []byte
Close() error

Status() (string, error)
}

type analyticsDeferredResultHandle struct {
type AnalyticsDeferredResultHandle struct {
handleUri string
status string
rows *analyticsRows
err error
provider httpProvider
hasResult bool
decoder *json.Decoder
timeout time.Duration
stream io.ReadCloser
}

type analyticsResponseHandle struct {
Status string `json:"status,omitempty"`
Handle string `json:"handle,omitempty"`
}

// Next assigns the next result from the results into the value pointer, returning whether the read was successful.
func (r *analyticsDeferredResultHandle) Next(valuePtr interface{}) bool {
func (r *AnalyticsDeferredResultHandle) Next(valuePtr interface{}) bool {
if r.err != nil {
return false
}
Expand All @@ -50,39 +49,85 @@ func (r *analyticsDeferredResultHandle) Next(valuePtr interface{}) bool {

// NextBytes returns the next result from the results as a byte array.
// TODO: how to deadline/timeout this?
func (r *analyticsDeferredResultHandle) NextBytes() []byte {
func (r *AnalyticsDeferredResultHandle) NextBytes() []byte {
if r.err != nil {
return nil
}

if r.status == "success" && !r.hasResult {
if r.status == "success" && r.decoder == nil {
req := &gocbcore.HttpRequest{
Service: gocbcore.CbasService,
Path: r.handleUri,
Method: "GET",
Service: gocbcore.CbasService,
Endpoint: r.handleUri,
Method: "GET",
}

resp, err := r.provider.DoHttpRequest(req)
if err != nil {
r.err = err
return nil
}
if resp.StatusCode != 200 {
r.err = fmt.Errorf("handle request failed, received http status %d", resp.StatusCode)
return nil
}

err := r.executeHandle(req, &r.rows.rows)
r.decoder = json.NewDecoder(resp.Body)
t, err := r.decoder.Token()
if err != nil {
bodyErr := resp.Body.Close()
if bodyErr != nil {
logDebugf("Failed to close response body, %s", bodyErr.Error())
}
r.err = err
return nil
}
r.hasResult = true
if delim, ok := t.(json.Delim); !ok || delim != '[' {
bodyErr := resp.Body.Close()
if bodyErr != nil {
logDebugf("Failed to close response body, %s", bodyErr.Error())
}
r.err = errors.New("expected response opening token to be [ but was " + t.(string))
return nil
}

r.stream = resp.Body
} else if r.status != "success" {
return nil
}

return r.rows.NextBytes()
return r.nextBytes()
}

func (r *AnalyticsDeferredResultHandle) nextBytes() []byte {
if r.decoder.More() {
var raw json.RawMessage
err := r.decoder.Decode(&raw)
if err != nil {
r.err = err
return nil
}

return raw
}

return nil
}

// Close marks the results as closed, returning any errors that occurred during reading the results.
func (r *analyticsDeferredResultHandle) Close() error {
r.rows.Close()
return r.err
func (r *AnalyticsDeferredResultHandle) Close() error {
if r.stream == nil {
return r.err
}

err := r.stream.Close()
if r.err != nil {
return r.err
}
return err
}

// One assigns the first value from the results into the value pointer.
func (r *analyticsDeferredResultHandle) One(valuePtr interface{}) error {
func (r *AnalyticsDeferredResultHandle) One(valuePtr interface{}) error {
if !r.Next(valuePtr) {
err := r.Close()
if err != nil {
Expand All @@ -103,31 +148,35 @@ func (r *analyticsDeferredResultHandle) One(valuePtr interface{}) error {

// Status triggers a network call to the handle URI, returning the current status of the long running query.
// TODO: how to deadline/timeout this?
func (r *analyticsDeferredResultHandle) Status() (string, error) {
func (r *AnalyticsDeferredResultHandle) Status() (string, error) {
req := &gocbcore.HttpRequest{
Service: gocbcore.CbasService,
Path: r.handleUri,
Method: "GET",
Service: gocbcore.CbasService,
Endpoint: r.handleUri,
Method: "GET",
}

var resp *analyticsResponseHandle
err := r.executeHandle(req, &resp)
if err != nil {
r.err = err
return "", err
}

r.status = resp.Status
r.handleUri = resp.Handle
if r.status == "success" {
r.handleUri = resp.Handle
}
return r.status, nil
}

// TODO: how to deadline/timeout this?
func (r *analyticsDeferredResultHandle) executeHandle(req *gocbcore.HttpRequest, valuePtr interface{}) error {
func (r *AnalyticsDeferredResultHandle) executeHandle(req *gocbcore.HttpRequest, valuePtr interface{}) error {
resp, err := r.provider.DoHttpRequest(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("handle request failed, received http status %d", resp.StatusCode)
}

jsonDec := json.NewDecoder(resp.Body)
err = jsonDec.Decode(valuePtr)
Expand Down
5 changes: 4 additions & 1 deletion analyticsquery_options.go
Expand Up @@ -11,7 +11,10 @@ import (

// AnalyticsQueryOptions is the set of options available to an Analytics query.
type AnalyticsQueryOptions struct {
ServerSideTimeout time.Duration
ServerSideTimeout time.Duration
// If Context is used then cancellation will only be applicable during initial http connect.
// If a timeout value is supplied with the context then that value will be propagated to the server
// and used to timeout the results stream.
Context context.Context
ParentSpanContext opentracing.SpanContext
Pretty bool
Expand Down

0 comments on commit 44ba6f2

Please sign in to comment.