Skip to content

Commit

Permalink
GT-379 Retriable batch reads in AQL cursors - V2 (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwierzbo committed May 19, 2023
1 parent 0f52bdf commit 97a7273
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [V2] Fix AF mode in tests
- Support for optimizer rules in AQL query
- Add support for refilling index caches
- [V2] Retriable batch reads in AQL cursors

## [1.5.2](https://github.com/arangodb/go-driver/tree/v1.5.2) (2023-03-01)
- Bump `DRIVER_VERSION`
Expand Down
45 changes: 40 additions & 5 deletions v2/arangodb/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,40 @@ type Cursor interface {
Plan() CursorPlan
}

// CursorBatch is returned from a query, used to iterate over a list of documents.
// In contrast to Cursor, CursorBatch does not load all documents into memory, but returns them in batches and allows for retries in case of errors.
// Note that a Cursor must always be closed to avoid holding on to resources in the server while they are no longer needed.
type CursorBatch interface {
io.Closer

// CloseWithContext run Close with specified Context
CloseWithContext(ctx context.Context) error

// HasMoreBatches returns true if the next call to ReadNextBatch does not return a NoMoreDocuments error.
HasMoreBatches() bool

// ReadNextBatch reads the next batch of documents from the cursor.
// The result must be a pointer to a slice of documents.
// E.g. `var result []MyStruct{}`.
ReadNextBatch(ctx context.Context, result interface{}) error

// RetryReadBatch retries the last batch read made by ReadNextBatch.
// The result must be a pointer to a slice of documents.
// E.g. `var result []MyStruct{}`.
RetryReadBatch(ctx context.Context, result interface{}) error

// Count returns the total number of result documents available.
// A valid return value is only available when the cursor has been created with `Count` and not with `Stream`.
Count() int64

// Statistics returns the query execution statistics for this cursor.
// This might not be valid if the cursor has been created with `Stream`
Statistics() CursorStats

// Plan returns the query execution plan for this cursor.
Plan() CursorPlan
}

// CursorStats TODO: all these int64 should be changed into uint64
type CursorStats struct {
// The total number of data-modification operations successfully executed.
Expand Down Expand Up @@ -92,11 +126,12 @@ type CursorStats struct {
}

type cursorData struct {
Count int64 `json:"count,omitempty"` // the total number of result documents available (only available if the query was executed with the count attribute set)
ID string `json:"id"` // id of temporary cursor created on the server (optional, see above)
Result jsonReader `json:"result,omitempty"` // a stream of result documents (might be empty if query has no results)
HasMore bool `json:"hasMore,omitempty"` // A boolean indicator whether there are more results available for the cursor on the server
Extra struct {
Count int64 `json:"count,omitempty"` // the total number of result documents available (only available if the query was executed with the count attribute set)
ID string `json:"id"` // id of temporary cursor created on the server (optional, see above)
Result jsonReader `json:"result,omitempty"` // a stream of result documents (might be empty if query has no results)
NextBatchID string `json:"nextBatchId,omitempty"` // id of the next batch of the cursor on the server when `allowRetry` option is true
HasMore bool `json:"hasMore,omitempty"` // A boolean indicator whether there are more results available for the cursor on the server
Extra struct {
Stats CursorStats `json:"stats,omitempty"`
// Plan describes plan for a cursor.
Plan CursorPlan `json:"plan,omitempty"`
Expand Down
76 changes: 61 additions & 15 deletions v2/arangodb/cursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,47 @@ package arangodb

import (
"context"
"encoding/json"
"net/http"
"sync"

"github.com/arangodb/go-driver/v2/arangodb/shared"

"github.com/pkg/errors"

"github.com/arangodb/go-driver/v2/arangodb/shared"
"github.com/arangodb/go-driver/v2/connection"
)

func newCursor(db *database, endpoint string, data cursorData) *cursor {
return &cursor{
c := &cursor{
db: db,
endpoint: endpoint,
data: data,
}

if data.NextBatchID != "" {
c.retryData = &retryData{
cursorID: data.ID,
currentBatchID: "1",
}
}

return c
}

var _ Cursor = &cursor{}

type cursor struct {
db *database

endpoint string

closed bool

data cursorData
db *database
endpoint string
closed bool
data cursorData
lock sync.Mutex
retryData *retryData
}

lock sync.Mutex
type retryData struct {
cursorID string
currentBatchID string
}

func (c *cursor) Close() error {
Expand Down Expand Up @@ -93,20 +104,42 @@ func (c *cursor) HasMore() bool {
return c.data.Result.HasMore() || c.data.HasMore
}

func (c *cursor) HasMoreBatches() bool {
return c.data.HasMore
}

func (c *cursor) ReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()

return c.readDocument(ctx, result)
}

func (c *cursor) ReadNextBatch(ctx context.Context, result interface{}) error {
err := c.getNextBatch(ctx, "")
if err != nil {
return err
}

return json.Unmarshal(c.data.Result.in, result)
}

func (c *cursor) RetryReadBatch(ctx context.Context, result interface{}) error {
err := c.getNextBatch(ctx, c.retryData.currentBatchID)
if err != nil {
return err
}

return json.Unmarshal(c.data.Result.in, result)
}

func (c *cursor) readDocument(ctx context.Context, result interface{}) (DocumentMeta, error) {
if c.closed {
return DocumentMeta{}, shared.NoMoreDocumentsError{}
}

if !c.data.Result.HasMore() {
if err := c.getNextBatch(ctx); err != nil {
if err := c.getNextBatch(ctx, ""); err != nil {
return DocumentMeta{}, err
}
}
Expand All @@ -129,14 +162,27 @@ func (c *cursor) readDocument(ctx context.Context, result interface{}) (Document
return meta, nil
}

func (c *cursor) getNextBatch(ctx context.Context) error {
if !c.data.HasMore {
func (c *cursor) getNextBatch(ctx context.Context, retryBatchID string) error {
if !c.data.HasMore && retryBatchID == "" {
return errors.WithStack(shared.NoMoreDocumentsError{})
}

url := c.db.url("_api", "cursor", c.data.ID)
// If we have a NextBatchID, use it
if c.data.NextBatchID != "" {
url = c.db.url("_api", "cursor", c.data.ID, c.data.NextBatchID)
}
// We have to retry the batch instead of fetching the next one
if retryBatchID != "" {
url = c.db.url("_api", "cursor", c.retryData.cursorID, retryBatchID)
}

// Update currentBatchID before fetching the next batch (no retry case)
if c.data.NextBatchID != "" && retryBatchID == "" {
c.retryData.currentBatchID = c.data.NextBatchID
}

resp, err := connection.CallPut(ctx, c.db.connection(), url, &c.data, nil, c.db.modifiers...)
resp, err := connection.CallPost(ctx, c.db.connection(), url, &c.data, nil, c.db.modifiers...)
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions v2/arangodb/database_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type DatabaseQuery interface {
// Note that the returned Cursor must always be closed to avoid holding on to resources in the server while they are no longer needed.
Query(ctx context.Context, query string, opts *QueryOptions) (Cursor, error)

// QueryBatch performs an AQL query, returning a cursor used to iterate over the returned documents in batches.
// In contrast to Query, QueryBatch does not load all documents into memory, but returns them in batches and allows for retries in case of errors.
// Note that the returned Cursor must always be closed to avoid holding on to resources in the server while they are no longer needed
QueryBatch(ctx context.Context, query string, opts *QueryOptions, result interface{}) (CursorBatch, error)

// ValidateQuery validates an AQL query.
// When the query is valid, nil returned, otherwise an error is returned.
// The query is not executed.
Expand Down Expand Up @@ -73,6 +78,9 @@ type QuerySubOptions struct {
// or for queries that read data which are known to be outside of the hot set. By setting the option to false, data read by the query will not make it into
// the RocksDB block cache if not already in there, thus leaving more room for the actual hot set.
FillBlockCache bool `json:"fillBlockCache,omitempty"`
// AllowRetry If set to `true`, ArangoDB will store cursor results in such a way
// that batch reads can be retried in the case of a communication error.
AllowRetry bool `json:"allowRetry,omitempty"`
}

type QueryOptions struct {
Expand Down
14 changes: 14 additions & 0 deletions v2/arangodb/database_query_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package arangodb

import (
"context"
"encoding/json"
"net/http"

"github.com/arangodb/go-driver/v2/arangodb/shared"
Expand All @@ -44,6 +45,10 @@ type databaseQuery struct {
}

func (d databaseQuery) Query(ctx context.Context, query string, opts *QueryOptions) (Cursor, error) {
return d.getCursor(ctx, query, opts, nil)
}

func (d databaseQuery) getCursor(ctx context.Context, query string, opts *QueryOptions, result interface{}) (*cursor, error) {
url := d.db.url("_api", "cursor")

req := struct {
Expand All @@ -66,12 +71,21 @@ func (d databaseQuery) Query(ctx context.Context, query string, opts *QueryOptio

switch code := resp.Code(); code {
case http.StatusCreated:
if result != nil {
if err := json.Unmarshal(response.cursorData.Result.in, result); err != nil {
return nil, err
}
}
return newCursor(d.db, resp.Endpoint(), response.cursorData), nil
default:
return nil, response.AsArangoErrorWithCode(code)
}
}

func (d databaseQuery) QueryBatch(ctx context.Context, query string, opts *QueryOptions, result interface{}) (CursorBatch, error) {
return d.getCursor(ctx, query, opts, result)
}

func (d databaseQuery) ValidateQuery(ctx context.Context, query string) error {
url := d.db.url("_api", "query")

Expand Down
Loading

0 comments on commit 97a7273

Please sign in to comment.