Skip to content

Commit

Permalink
datastore: enable to set query batch size
Browse files Browse the repository at this point in the history
This adds q.BatchSize() method to avoid the error that claims too many
datastore.next() calls due to the lack of specifying the `count`
property (= batch size).

Fixes golang#88
  • Loading branch information
delphinus committed Sep 24, 2017
1 parent 24e4144 commit 08ae4b5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
39 changes: 33 additions & 6 deletions datastore/query.go
Expand Up @@ -87,6 +87,7 @@ type Query struct {
eventual bool
limit int32
offset int32
count int32
start *pb.CompiledCursor
end *pb.CompiledCursor

Expand Down Expand Up @@ -241,6 +242,18 @@ func (q *Query) Offset(offset int) *Query {
return q
}

// BatchSize returns a derivative query to fetch the supplied number of results
// at once. This value should be equal to or less than the Limit.
func (q *Query) BatchSize(size int) *Query {
q = q.clone()
if size < math.MinInt32 || size > math.MaxInt32 {
q.err = errors.New("datastore: query batch size overflow")
return q
}
q.count = int32(size)
return q
}

// Start returns a derivative query with the given start point.
func (q *Query) Start(c Cursor) *Query {
q = q.clone()
Expand Down Expand Up @@ -325,6 +338,9 @@ func (q *Query) toProto(dst *pb.Query, appID string) error {
if q.offset != 0 {
dst.Offset = proto.Int32(q.offset)
}
if q.count != 0 {
dst.Count = proto.Int32(q.count)
}
dst.CompiledCursor = q.start
dst.EndCompiledCursor = q.end
dst.Compile = proto.Bool(true)
Expand Down Expand Up @@ -394,7 +410,7 @@ func (q *Query) Count(c context.Context) (int, error) {
if !res.GetMoreResults() {
break
}
if err := callNext(c, res, newQ.offset-n, 0); err != nil {
if err := callNext(c, res, newQ.offset-n, q.count); err != nil {
return 0, err
}
}
Expand All @@ -409,15 +425,15 @@ func (q *Query) Count(c context.Context) (int, error) {

// callNext issues a datastore_v3/Next RPC to advance a cursor, such as that
// returned by a query with more results.
func callNext(c context.Context, res *pb.QueryResult, offset, limit int32) error {
func callNext(c context.Context, res *pb.QueryResult, offset, count int32) error {
if res.Cursor == nil {
return errors.New("datastore: internal error: server did not return a cursor")
}
req := &pb.NextRequest{
Cursor: res.Cursor,
}
if limit >= 0 {
req.Count = proto.Int32(limit)
if count >= 0 {
req.Count = proto.Int32(count)
}
if offset != 0 {
req.Offset = proto.Int32(offset)
Expand Down Expand Up @@ -523,6 +539,7 @@ func (q *Query) Run(c context.Context) *Iterator {
t := &Iterator{
c: c,
limit: q.limit,
count: q.count,
q: q,
prevCC: q.start,
}
Expand All @@ -536,9 +553,13 @@ func (q *Query) Run(c context.Context) *Iterator {
return t
}
offset := q.offset - t.res.GetSkippedResults()
count := t.limit
if t.count > 0 && (count == 0 || t.count < count) {
count = t.count
}
for offset > 0 && t.res.GetMoreResults() {
t.prevCC = t.res.CompiledCursor
if err := callNext(t.c, &t.res, offset, t.limit); err != nil {
if err := callNext(t.c, &t.res, offset, count); err != nil {
t.err = err
break
}
Expand Down Expand Up @@ -566,6 +587,8 @@ type Iterator struct {
// limit is the limit on the number of results this iterator should return.
// A negative value means unlimited.
limit int32
// count is the number of results this iterator should fetch at once.
count int32
// q is the original query which yielded this iterator.
q *Query
// prevCC is the compiled cursor that marks the end of the previous batch
Expand Down Expand Up @@ -605,7 +628,11 @@ func (t *Iterator) next() (*Key, *pb.EntityProto, error) {
return nil, nil, t.err
}
t.prevCC = t.res.CompiledCursor
if err := callNext(t.c, &t.res, 0, t.limit); err != nil {
count := t.limit
if t.count > 0 && (count == 0 || t.count < count) {
count = t.count
}
if err := callNext(t.c, &t.res, 0, count); err != nil {
t.err = err
return nil, nil, t.err
}
Expand Down
3 changes: 2 additions & 1 deletion datastore/query_test.go
Expand Up @@ -464,7 +464,7 @@ func TestQueryToProto(t *testing.T) {
},
{
desc: "standard query",
query: NewQuery("kind").Order("-I").Filter("I >", 17).Filter("U =", "Dave").Limit(7).Offset(42),
query: NewQuery("kind").Order("-I").Filter("I >", 17).Filter("U =", "Dave").Limit(7).Offset(42).BatchSize(5),
want: &pb.Query{
Kind: proto.String("kind"),
Filter: []*pb.Query_Filter{
Expand Down Expand Up @@ -497,6 +497,7 @@ func TestQueryToProto(t *testing.T) {
},
Limit: proto.Int32(7),
Offset: proto.Int32(42),
Count: proto.Int32(5),
},
},
{
Expand Down

0 comments on commit 08ae4b5

Please sign in to comment.