Skip to content

Commit

Permalink
MB-52148: Pass readUnits to query for Index3 and PrimaryIndex3 API
Browse files Browse the repository at this point in the history
Change-Id: Ie34fcb274d8ec8cd63ec75e505ebdc7ed5d5b74e
  • Loading branch information
ksaikrishnateja committed Jul 21, 2022
1 parent 3951512 commit 7a30a82
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 106 deletions.
11 changes: 5 additions & 6 deletions secondary/indexer/scan_coordinator.go
Expand Up @@ -250,8 +250,8 @@ func (s *scanCoordinator) handleSupvervisorCommands(cmd Message) {
/////////////////////////////////////////////////////////////////////////

// serverCallback is the single routine that starts each index scan.
func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{}, conn net.Conn,
cancelCh <-chan bool) {
func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{},
conn net.Conn, cancelCh <-chan bool, clientVersion uint32) {

if protoReq == queryport.Ping {
if ctx != nil {
Expand All @@ -269,8 +269,9 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{},
req, err := NewScanRequest(protoReq, ctx, cancelCh, s)
atime := time.Now()
w := NewProtoWriter(req.ScanType, conn)
var readUnits uint64 = 0
defer func() {
s.handleError(req.LogPrefix, w.Done())
s.handleError(req.LogPrefix, w.Done(readUnits, clientVersion))
req.Done()
}()

Expand Down Expand Up @@ -445,9 +446,7 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{},

// Do the scan
s.processRequest(req, w, is, t0)

// TODO: [ReadMetering] return readUnits to query
// readUnits = req.GetReadUnits()
readUnits = req.GetReadUnits()

if len(req.Ctxs) != 0 {
for _, ctx := range req.Ctxs {
Expand Down
15 changes: 12 additions & 3 deletions secondary/indexer/scan_protocol.go
Expand Up @@ -10,11 +10,12 @@ package indexer

import (
"encoding/binary"
"net"

"github.com/couchbase/indexing/secondary/common"
p "github.com/couchbase/indexing/secondary/pipeline"
protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
"github.com/golang/protobuf/proto"
"net"
)

type ScanResponseWriter interface {
Expand All @@ -23,7 +24,7 @@ type ScanResponseWriter interface {
Count(count uint64) error
RawBytes([]byte) error
Row(pk, sk []byte) error
Done() error
Done(readUnits uint64, clientVersion uint32) error
Helo() error
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func (w *protoResponseWriter) Row(pk, sk []byte) error {
return nil
}

func (w *protoResponseWriter) Done() error {
func (w *protoResponseWriter) Done(readUnits uint64, clientVersion uint32) error {
defer p.PutBlock(w.encBuf)
defer p.PutBlock(w.rowBuf)

Expand All @@ -164,5 +165,13 @@ func (w *protoResponseWriter) Done() error {
}
}

if clientVersion >= common.INDEXER_72_VERSION {
res := &protobuf.StreamEndResponse{
ReadUnits: proto.Uint64(readUnits),
}

return protobuf.EncodeAndWrite(w.conn, *w.encBuf, res)
}

return nil
}
4 changes: 2 additions & 2 deletions secondary/indexer/scan_request.go
Expand Up @@ -316,8 +316,6 @@ func NewScanRequest(protoReq interface{}, ctx interface{},

r.keySzCfg = getKeySizeConfig(cfg)

// TODO: [ReadMetering] Get User from query and populate request

switch req := protoReq.(type) {
case *protobuf.HeloRequest:
r.ScanType = HeloReq
Expand Down Expand Up @@ -379,6 +377,7 @@ func NewScanRequest(protoReq interface{}, ctx interface{},
case *protobuf.ScanRequest:
r.DefnID = req.GetDefnID()
r.RequestId = req.GetRequestId()
r.User = req.GetUser()
r.rollbackTime = req.GetRollbackTime()
r.PartitionIds = makePartitionIds(req.GetPartitionIds())
cons := common.Consistency(req.GetCons())
Expand Down Expand Up @@ -438,6 +437,7 @@ func NewScanRequest(protoReq interface{}, ctx interface{},
case *protobuf.ScanAllRequest:
r.DefnID = req.GetDefnID()
r.RequestId = req.GetRequestId()
r.User = req.GetUser()
r.rollbackTime = req.GetRollbackTime()
r.PartitionIds = makePartitionIds(req.GetPartitionIds())
cons := common.Consistency(req.GetCons())
Expand Down
15 changes: 11 additions & 4 deletions secondary/protobuf/query/query.go
@@ -1,10 +1,13 @@
package protoQuery

import "errors"
import json "github.com/couchbase/indexing/secondary/common/json"
import (
"errors"

import c "github.com/couchbase/indexing/secondary/common"
import "github.com/golang/protobuf/proto"
json "github.com/couchbase/indexing/secondary/common/json"

c "github.com/couchbase/indexing/secondary/common"
"github.com/golang/protobuf/proto"
)

// GetEntries implements queryport.client.ResponseReader{} method.
func (r *ResponseStream) GetEntries(dataEncFmt c.DataEncodingFormat) (*c.ScanResultEntries, [][]byte, error) {
Expand Down Expand Up @@ -50,6 +53,10 @@ func (r *ResponseStream) Error() error {
return nil
}

func (r *ResponseStream) GetReadUnits() uint64 {
return 0
}

// GetEntries implements queryport.client.ResponseReader{} method.
func (r *StreamEndResponse) GetEntries(dataEncFmt c.DataEncodingFormat) (*c.ScanResultEntries, [][]byte, error) {
var results c.ScanResultEntries
Expand Down
52 changes: 28 additions & 24 deletions secondary/protobuf/query/query.proto
Expand Up @@ -60,34 +60,36 @@ message StatisticsResponse {

// Scan request to indexer.
message ScanRequest {
required uint64 defnID = 1;
required Span span = 2;
required bool distinct = 3;
required int64 limit = 4;
required uint32 cons = 5;
optional TsConsistency vector = 6;
optional string requestId = 7;
repeated Scan scans = 8;
optional IndexProjection indexprojection = 9;
required uint64 defnID = 1;
required Span span = 2;
required bool distinct = 3;
required int64 limit = 4;
required uint32 cons = 5;
optional TsConsistency vector = 6;
optional string requestId = 7;
repeated Scan scans = 8;
optional IndexProjection indexprojection = 9;
optional bool reverse = 10;
optional int64 offset = 11;
optional int64 rollbackTime = 12;
repeated uint64 partitionIds = 13;
optional GroupAggr groupAggr = 14;
optional bool sorted = 15;
optional uint32 dataEncFmt = 16;
repeated uint64 partitionIds = 13;
optional GroupAggr groupAggr = 14;
optional bool sorted = 15;
optional uint32 dataEncFmt = 16;
optional string user = 17;
}

// Full table scan request from indexer.
message ScanAllRequest {
required uint64 defnID = 1;
required int64 limit = 2;
required uint32 cons = 3;
optional TsConsistency vector = 4;
optional string requestId = 5;
optional int64 rollbackTime = 6;
repeated uint64 partitionIds = 7;
optional uint32 dataEncFmt = 8;
required uint64 defnID = 1;
required int64 limit = 2;
required uint32 cons = 3;
optional TsConsistency vector = 4;
optional string requestId = 5;
optional int64 rollbackTime = 6;
repeated uint64 partitionIds = 7;
optional uint32 dataEncFmt = 8;
optional string user = 9;
}

// Request by client to stop streaming the query results.
Expand All @@ -101,7 +103,8 @@ message ResponseStream {

// Last response packet sent by server to end query results.
message StreamEndResponse {
optional Error err = 1;
optional Error err = 1;
optional uint64 readUnits = 2;
}

// Count request to indexer.
Expand Down Expand Up @@ -195,8 +198,9 @@ message GroupAggr {
// Queryport server authentication

message AuthRequest {
required string user = 1;
required string pass = 2;
required string user = 1;
required string pass = 2;
optional uint32 clientVersion = 3;
}

message AuthResponse {
Expand Down
14 changes: 9 additions & 5 deletions secondary/queryport/app.go
@@ -1,9 +1,13 @@
package queryport

import "github.com/couchbase/indexing/secondary/logging"
import c "github.com/couchbase/indexing/secondary/common"
import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
import "net"
import (
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"

"net"

protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
)

// Application is example application logic that uses query-port server
func Application(config c.Config) {
Expand All @@ -12,7 +16,7 @@ func Application(config c.Config) {
"",
"localhost:9990",
func(req interface{}, ctx interface{},
conn net.Conn, quitch <-chan bool) {
conn net.Conn, quitch <-chan bool, clientVersion uint32) {
requestHandler(req, conn, quitch, killch)
},
nil,
Expand Down
2 changes: 2 additions & 0 deletions secondary/queryport/client/client.go
Expand Up @@ -55,6 +55,8 @@ type ResponseReader interface {

// Error returns the error value, if nil there is no error.
Error() error

GetReadUnits() uint64
}

// ResponseSender is responsible for forwarding result to the client
Expand Down
38 changes: 21 additions & 17 deletions secondary/queryport/client/conn_pool.go
@@ -1,20 +1,23 @@
package client

import "errors"
import "fmt"
import "net"
import "time"
import "sync/atomic"

import "github.com/couchbase/indexing/secondary/common"
import "github.com/couchbase/indexing/secondary/logging"
import "github.com/couchbase/indexing/secondary/transport"
import "github.com/couchbase/indexing/secondary/security"

import "github.com/couchbase/cbauth"

import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
import gometrics "github.com/rcrowley/go-metrics"
import (
"errors"
"fmt"
"net"
"sync/atomic"
"time"

"github.com/couchbase/cbauth"
"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/security"
"github.com/couchbase/indexing/secondary/transport"
"github.com/golang/protobuf/proto"

protobuf "github.com/couchbase/indexing/secondary/protobuf/query"

gometrics "github.com/rcrowley/go-metrics"
)

const (
CONN_RELEASE_INTERVAL = 5 // Seconds. Don't change as long as go-metrics/ewma is being used.
Expand Down Expand Up @@ -158,8 +161,9 @@ func (cp *connectionPool) doAuth(conn *connection) error {

// Send Auth packet.
authReq := &protobuf.AuthRequest{
User: &user,
Pass: &pass,
User: &user,
Pass: &pass,
ClientVersion: proto.Uint32(uint32(common.INDEXER_CUR_VERSION)),
}

err = conn.pkt.Send(conn.conn, authReq)
Expand Down

0 comments on commit 7a30a82

Please sign in to comment.