Skip to content

Commit

Permalink
Feature/backport #3822 to 2.1.2 (#3843)
Browse files Browse the repository at this point in the history
* Fix for covering channels query (#3828)

* Fix for covering channels query

Adds EXPLAIN support so that unit test can validate queries are covering.

* Remove unused explain (moved down to bucket)

* Test modifications for 2.1 compatibility

Removes dependency on testify.
  • Loading branch information
adamcfraser authored and bbrks committed Dec 3, 2018
1 parent fea9947 commit 35fe28e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 10 deletions.
14 changes: 14 additions & 0 deletions base/bucket_n1ql.go
Expand Up @@ -76,6 +76,20 @@ func (bucket *CouchbaseBucketGoCB) Query(statement string, params interface{}, c
return nil, err
}

func (bucket *CouchbaseBucketGoCB) ExplainQuery(statement string, params interface{}) (plan map[string]interface{}, err error) {
explainStatement := fmt.Sprintf("EXPLAIN %s", statement)

explainResults, explainErr := bucket.Query(explainStatement, params, gocb.RequestPlus, false)

if explainErr != nil {
return nil, explainErr
}

firstRow := explainResults.NextBytes()
unmarshalErr := json.Unmarshal(firstRow, &plan)
return plan, unmarshalErr
}

// CreateIndex issues a CREATE INDEX query in the current bucket, using the form:
// CREATE INDEX indexName ON bucket.Name(expression) WHERE filterExpression WITH options
// Sample usage with resulting statement:
Expand Down
38 changes: 29 additions & 9 deletions db/query.go
Expand Up @@ -75,8 +75,8 @@ var QueryChannels = SGQuery{
name: QueryTypeChannels,
statement: fmt.Sprintf(
"SELECT [op.name, LEAST($sync.sequence, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)][1] AS seq, "+
"[op.name, LEAST($sync, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)][2] AS rRev, "+
"[op.name, LEAST($sync, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)][3] AS rDel, "+
"[op.name, LEAST($sync.sequence, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)][2] AS rRev, "+
"[op.name, LEAST($sync.sequence, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)][3] AS rDel, "+
"$sync.rev AS rev, "+
"$sync.flags AS flags, "+
"META(`%s`).id AS id "+
Expand Down Expand Up @@ -239,13 +239,18 @@ func (context *DatabaseContext) QueryAccess(username string) (sgbucket.QueryResu
base.Warnf(base.KeyAll, "QueryAccess called with empty username - returning empty result iterator")
return &EmptyResultIterator{}, nil
}

accessQueryStatement := context.buildAccessQuery(username)
// Can't use prepared query because username is in select clause
accessQueryStatement := replaceSyncTokensQuery(QueryAccess.statement, context.UseXattrs())
accessQueryStatement = strings.Replace(accessQueryStatement, "$"+QueryParamUserName, username, -1)
return context.N1QLQueryWithStats(QueryAccess.name, accessQueryStatement, nil, gocb.RequestPlus, QueryAccess.adhoc)
}

// Builds the query statement for an access N1QL query.
func (context *DatabaseContext) buildAccessQuery(username string) string {
statement := replaceSyncTokensQuery(QueryAccess.statement, context.UseXattrs())
statement = strings.Replace(statement, "$"+QueryParamUserName, username, -1)
return statement
}

// Query to compute the set of roles granted to the specified user via the Sync Function
func (context *DatabaseContext) QueryRoleAccess(username string) (sgbucket.QueryResultIterator, error) {

Expand All @@ -261,12 +266,18 @@ func (context *DatabaseContext) QueryRoleAccess(username string) (sgbucket.Query
return &EmptyResultIterator{}, nil
}

accessQueryStatement := context.buildRoleAccessQuery(username)
// Can't use prepared query because username is in select clause
accessQueryStatement := replaceSyncTokensQuery(QueryRoleAccess.statement, context.UseXattrs())
accessQueryStatement = strings.Replace(accessQueryStatement, "$"+QueryParamUserName, username, -1)
return context.N1QLQueryWithStats(QueryTypeRoleAccess, accessQueryStatement, nil, gocb.RequestPlus, true)
}

// Builds the query statement for a roleAccess N1QL query.
func (context *DatabaseContext) buildRoleAccessQuery(username string) string {
statement := replaceSyncTokensQuery(QueryRoleAccess.statement, context.UseXattrs())
statement = strings.Replace(statement, "$"+QueryParamUserName, username, -1)
return statement
}

// Query to compute the set of documents assigned to the specified channel within the sequence range
func (context *DatabaseContext) QueryChannels(channelName string, startSeq uint64, endSeq uint64, limit int) (sgbucket.QueryResultIterator, error) {

Expand All @@ -280,6 +291,14 @@ func (context *DatabaseContext) QueryChannels(channelName string, startSeq uint6
// Standard channel index/query doesn't support the star channel. For star channel queries, QueryStarChannel
// (which is backed by IndexAllDocs) is used. The QueryStarChannel result schema is a subset of the
// QueryChannels result schema (removal handling isn't needed for the star channel).
channelQueryStatement, params := context.buildChannelsQuery(channelName, startSeq, endSeq, limit)

return context.N1QLQueryWithStats(QueryChannels.name, channelQueryStatement, params, gocb.RequestPlus, QueryChannels.adhoc)
}

// Builds the query statement and query parameters for a channels N1QL query. Also used by unit tests to validate
// query is covering.
func (context *DatabaseContext) buildChannelsQuery(channelName string, startSeq uint64, endSeq uint64, limit int) (statement string, params map[string]interface{}) {

channelQuery := QueryChannels
if channelName == "*" {
Expand All @@ -292,7 +311,7 @@ func (context *DatabaseContext) QueryChannels(channelName string, startSeq uint6
}

// Channel queries use a prepared query
params := make(map[string]interface{}, 3)
params = make(map[string]interface{}, 3)
params[QueryParamChannelName] = channelName
params[QueryParamStartSeq] = startSeq
if endSeq == 0 {
Expand All @@ -303,7 +322,8 @@ func (context *DatabaseContext) QueryChannels(channelName string, startSeq uint6
endSeq++
}
params[QueryParamEndSeq] = endSeq
return context.N1QLQueryWithStats(channelQuery.name, channelQueryStatement, params, gocb.RequestPlus, channelQuery.adhoc)

return channelQueryStatement, params
}

func (context *DatabaseContext) QueryResync() (sgbucket.QueryResultIterator, error) {
Expand Down
80 changes: 79 additions & 1 deletion db/query_test.go
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"testing"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbaselabs/go.assert"
)
Expand Down Expand Up @@ -93,6 +93,84 @@ func TestQueryChannelsStatsN1ql(t *testing.T) {

}

// Validate that channels queries (channels, starChannel) are covering
func TestCoveringQueries(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test is Couchbase Server only")
}

db, testBucket := setupTestDBWithCacheOptions(t, CacheOptions{})
defer testBucket.Close()
defer tearDownTestDB(t, db)

gocbBucket, ok := base.AsGoCBBucket(testBucket)
if !ok {
t.Errorf("Unable to get gocbBucket for testBucket")
}

// channels
channelsStatement, params := db.buildChannelsQuery("ABC", 0, 10, 100)
plan, explainErr := gocbBucket.ExplainQuery(channelsStatement, params)
assertNoError(t, explainErr, "Error generating explain for channels query")
covered := isCovered(plan)
assertTrue(t, covered, "Channel query isn't covered by index")

// star channel
channelStarStatement, params := db.buildChannelsQuery("*", 0, 10, 100)
plan, explainErr = gocbBucket.ExplainQuery(channelStarStatement, params)
assertNoError(t, explainErr, "Error generating explain for star channel query")
covered = isCovered(plan)
assertTrue(t, covered, "Star channel query isn't covered by index")

// Access and roleAccess currently aren't covering, because of the need to target the user property by name
// in the SELECT.
// Including here for ease-of-conversion when we get an indexing enhancement to support covered queries.
accessStatement := db.buildAccessQuery("user1")
plan, explainErr = gocbBucket.ExplainQuery(accessStatement, nil)
assertNoError(t, explainErr, "Error generating explain for access query")
covered = isCovered(plan)
//assert.True(t, covered, "Access query isn't covered by index")

// roleAccess
roleAccessStatement := db.buildRoleAccessQuery("user1")
plan, explainErr = gocbBucket.ExplainQuery(roleAccessStatement, nil)
assertNoError(t, explainErr, "Error generating explain for roleAccess query")
covered = isCovered(plan)
//assert.True(t, !covered, "RoleAccess query isn't covered by index")

}

// Parse the plan looking for use of the fetch operation (appears as the key/value pair "#operator":"Fetch")
// If there's no fetch operator in the plan, we can assume the query is covered by the index.
// The plan returned by an EXPLAIN is a nested hierarchy with operators potentially appearing at different
// depths, so need to traverse the JSON object.
// https://docs.couchbase.com/server/6.0/n1ql/n1ql-language-reference/explain.html
func isCovered(plan map[string]interface{}) bool {
for key, value := range plan {
switch value := value.(type) {
case string:
if key == "#operator" && value == "Fetch" {
return false
}
case map[string]interface{}:
if !isCovered(value) {
return false
}
case []interface{}:
for _, arrayValue := range value {
jsonArrayValue, ok := arrayValue.(map[string]interface{})
if ok {
if !isCovered(jsonArrayValue) {
return false
}
}
}
default:
}
}
return true
}

func countQueryResults(results sgbucket.QueryResultIterator) int {

count := 0
Expand Down

0 comments on commit 35fe28e

Please sign in to comment.