diff --git a/secondary/indexer/scan_coordinator.go b/secondary/indexer/scan_coordinator.go index 3644a7b75..a8c87e242 100644 --- a/secondary/indexer/scan_coordinator.go +++ b/secondary/indexer/scan_coordinator.go @@ -250,8 +250,8 @@ func (s *scanCoordinator) handleSupvervisorCommands(cmd Message) { ///////////////////////////////////////////////////////////////////////// // serverCallback is the single routine that starts each index scan. -func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{}, conn net.Conn, - cancelCh <-chan bool) { +func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{}, + conn net.Conn, cancelCh <-chan bool, clientVersion uint32) { if protoReq == queryport.Ping { if ctx != nil { @@ -269,8 +269,9 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{}, req, err := NewScanRequest(protoReq, ctx, cancelCh, s) atime := time.Now() w := NewProtoWriter(req.ScanType, conn) + var readUnits uint64 = 0 defer func() { - s.handleError(req.LogPrefix, w.Done()) + s.handleError(req.LogPrefix, w.Done(readUnits, clientVersion)) req.Done() }() @@ -445,9 +446,7 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, ctx interface{}, // Do the scan s.processRequest(req, w, is, t0) - - // TODO: [ReadMetering] return readUnits to query - // readUnits = req.GetReadUnits() + readUnits = req.GetReadUnits() if len(req.Ctxs) != 0 { for _, ctx := range req.Ctxs { diff --git a/secondary/indexer/scan_protocol.go b/secondary/indexer/scan_protocol.go index ba2c309f5..39324d0e5 100644 --- a/secondary/indexer/scan_protocol.go +++ b/secondary/indexer/scan_protocol.go @@ -10,11 +10,12 @@ package indexer import ( "encoding/binary" + "net" + "github.com/couchbase/indexing/secondary/common" p "github.com/couchbase/indexing/secondary/pipeline" protobuf "github.com/couchbase/indexing/secondary/protobuf/query" "github.com/golang/protobuf/proto" - "net" ) type ScanResponseWriter interface { @@ -23,7 +24,7 @@ type ScanResponseWriter interface { Count(count uint64) error RawBytes([]byte) error Row(pk, sk []byte) error - Done() error + Done(readUnits uint64, clientVersion uint32) error Helo() error } @@ -152,7 +153,7 @@ func (w *protoResponseWriter) Row(pk, sk []byte) error { return nil } -func (w *protoResponseWriter) Done() error { +func (w *protoResponseWriter) Done(readUnits uint64, clientVersion uint32) error { defer p.PutBlock(w.encBuf) defer p.PutBlock(w.rowBuf) @@ -164,5 +165,13 @@ func (w *protoResponseWriter) Done() error { } } + if clientVersion >= common.INDEXER_72_VERSION { + res := &protobuf.StreamEndResponse{ + ReadUnits: proto.Uint64(readUnits), + } + + return protobuf.EncodeAndWrite(w.conn, *w.encBuf, res) + } + return nil } diff --git a/secondary/indexer/scan_request.go b/secondary/indexer/scan_request.go index 37e69591a..7dcaa0de0 100644 --- a/secondary/indexer/scan_request.go +++ b/secondary/indexer/scan_request.go @@ -316,8 +316,6 @@ func NewScanRequest(protoReq interface{}, ctx interface{}, r.keySzCfg = getKeySizeConfig(cfg) - // TODO: [ReadMetering] Get User from query and populate request - switch req := protoReq.(type) { case *protobuf.HeloRequest: r.ScanType = HeloReq @@ -379,6 +377,7 @@ func NewScanRequest(protoReq interface{}, ctx interface{}, case *protobuf.ScanRequest: r.DefnID = req.GetDefnID() r.RequestId = req.GetRequestId() + r.User = req.GetUser() r.rollbackTime = req.GetRollbackTime() r.PartitionIds = makePartitionIds(req.GetPartitionIds()) cons := common.Consistency(req.GetCons()) @@ -438,6 +437,7 @@ func NewScanRequest(protoReq interface{}, ctx interface{}, case *protobuf.ScanAllRequest: r.DefnID = req.GetDefnID() r.RequestId = req.GetRequestId() + r.User = req.GetUser() r.rollbackTime = req.GetRollbackTime() r.PartitionIds = makePartitionIds(req.GetPartitionIds()) cons := common.Consistency(req.GetCons()) diff --git a/secondary/protobuf/query/query.go b/secondary/protobuf/query/query.go index 7138f50d0..59f15cb34 100644 --- a/secondary/protobuf/query/query.go +++ b/secondary/protobuf/query/query.go @@ -1,10 +1,13 @@ package protoQuery -import "errors" -import json "github.com/couchbase/indexing/secondary/common/json" +import ( + "errors" -import c "github.com/couchbase/indexing/secondary/common" -import "github.com/golang/protobuf/proto" + json "github.com/couchbase/indexing/secondary/common/json" + + c "github.com/couchbase/indexing/secondary/common" + "github.com/golang/protobuf/proto" +) // GetEntries implements queryport.client.ResponseReader{} method. func (r *ResponseStream) GetEntries(dataEncFmt c.DataEncodingFormat) (*c.ScanResultEntries, [][]byte, error) { @@ -50,6 +53,10 @@ func (r *ResponseStream) Error() error { return nil } +func (r *ResponseStream) GetReadUnits() uint64 { + return 0 +} + // GetEntries implements queryport.client.ResponseReader{} method. func (r *StreamEndResponse) GetEntries(dataEncFmt c.DataEncodingFormat) (*c.ScanResultEntries, [][]byte, error) { var results c.ScanResultEntries diff --git a/secondary/protobuf/query/query.proto b/secondary/protobuf/query/query.proto index fc215e5b9..21e903605 100644 --- a/secondary/protobuf/query/query.proto +++ b/secondary/protobuf/query/query.proto @@ -60,34 +60,36 @@ message StatisticsResponse { // Scan request to indexer. message ScanRequest { - required uint64 defnID = 1; - required Span span = 2; - required bool distinct = 3; - required int64 limit = 4; - required uint32 cons = 5; - optional TsConsistency vector = 6; - optional string requestId = 7; - repeated Scan scans = 8; - optional IndexProjection indexprojection = 9; + required uint64 defnID = 1; + required Span span = 2; + required bool distinct = 3; + required int64 limit = 4; + required uint32 cons = 5; + optional TsConsistency vector = 6; + optional string requestId = 7; + repeated Scan scans = 8; + optional IndexProjection indexprojection = 9; optional bool reverse = 10; optional int64 offset = 11; optional int64 rollbackTime = 12; - repeated uint64 partitionIds = 13; - optional GroupAggr groupAggr = 14; - optional bool sorted = 15; - optional uint32 dataEncFmt = 16; + repeated uint64 partitionIds = 13; + optional GroupAggr groupAggr = 14; + optional bool sorted = 15; + optional uint32 dataEncFmt = 16; + optional string user = 17; } // Full table scan request from indexer. message ScanAllRequest { - required uint64 defnID = 1; - required int64 limit = 2; - required uint32 cons = 3; - optional TsConsistency vector = 4; - optional string requestId = 5; - optional int64 rollbackTime = 6; - repeated uint64 partitionIds = 7; - optional uint32 dataEncFmt = 8; + required uint64 defnID = 1; + required int64 limit = 2; + required uint32 cons = 3; + optional TsConsistency vector = 4; + optional string requestId = 5; + optional int64 rollbackTime = 6; + repeated uint64 partitionIds = 7; + optional uint32 dataEncFmt = 8; + optional string user = 9; } // Request by client to stop streaming the query results. @@ -101,7 +103,8 @@ message ResponseStream { // Last response packet sent by server to end query results. message StreamEndResponse { - optional Error err = 1; + optional Error err = 1; + optional uint64 readUnits = 2; } // Count request to indexer. @@ -195,8 +198,9 @@ message GroupAggr { // Queryport server authentication message AuthRequest { - required string user = 1; - required string pass = 2; + required string user = 1; + required string pass = 2; + optional uint32 clientVersion = 3; } message AuthResponse { diff --git a/secondary/queryport/app.go b/secondary/queryport/app.go index 80b13d0be..d6f0dd44f 100644 --- a/secondary/queryport/app.go +++ b/secondary/queryport/app.go @@ -1,9 +1,13 @@ package queryport -import "github.com/couchbase/indexing/secondary/logging" -import c "github.com/couchbase/indexing/secondary/common" -import protobuf "github.com/couchbase/indexing/secondary/protobuf/query" -import "net" +import ( + c "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/logging" + + "net" + + protobuf "github.com/couchbase/indexing/secondary/protobuf/query" +) // Application is example application logic that uses query-port server func Application(config c.Config) { @@ -12,7 +16,7 @@ func Application(config c.Config) { "", "localhost:9990", func(req interface{}, ctx interface{}, - conn net.Conn, quitch <-chan bool) { + conn net.Conn, quitch <-chan bool, clientVersion uint32) { requestHandler(req, conn, quitch, killch) }, nil, diff --git a/secondary/queryport/client/client.go b/secondary/queryport/client/client.go index 96162dc21..7a6607c76 100644 --- a/secondary/queryport/client/client.go +++ b/secondary/queryport/client/client.go @@ -55,6 +55,8 @@ type ResponseReader interface { // Error returns the error value, if nil there is no error. Error() error + + GetReadUnits() uint64 } // ResponseSender is responsible for forwarding result to the client diff --git a/secondary/queryport/client/conn_pool.go b/secondary/queryport/client/conn_pool.go index 139858699..5b6b99385 100644 --- a/secondary/queryport/client/conn_pool.go +++ b/secondary/queryport/client/conn_pool.go @@ -1,20 +1,23 @@ package client -import "errors" -import "fmt" -import "net" -import "time" -import "sync/atomic" - -import "github.com/couchbase/indexing/secondary/common" -import "github.com/couchbase/indexing/secondary/logging" -import "github.com/couchbase/indexing/secondary/transport" -import "github.com/couchbase/indexing/secondary/security" - -import "github.com/couchbase/cbauth" - -import protobuf "github.com/couchbase/indexing/secondary/protobuf/query" -import gometrics "github.com/rcrowley/go-metrics" +import ( + "errors" + "fmt" + "net" + "sync/atomic" + "time" + + "github.com/couchbase/cbauth" + "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/logging" + "github.com/couchbase/indexing/secondary/security" + "github.com/couchbase/indexing/secondary/transport" + "github.com/golang/protobuf/proto" + + protobuf "github.com/couchbase/indexing/secondary/protobuf/query" + + gometrics "github.com/rcrowley/go-metrics" +) const ( CONN_RELEASE_INTERVAL = 5 // Seconds. Don't change as long as go-metrics/ewma is being used. @@ -158,8 +161,9 @@ func (cp *connectionPool) doAuth(conn *connection) error { // Send Auth packet. authReq := &protobuf.AuthRequest{ - User: &user, - Pass: &pass, + User: &user, + Pass: &pass, + ClientVersion: proto.Uint32(uint32(common.INDEXER_CUR_VERSION)), } err = conn.pkt.Send(conn.conn, authReq) diff --git a/secondary/queryport/client/scan_client.go b/secondary/queryport/client/scan_client.go index 6fc8a0805..929d99ff1 100644 --- a/secondary/queryport/client/scan_client.go +++ b/secondary/queryport/client/scan_client.go @@ -10,19 +10,23 @@ package client -import "errors" -import "fmt" -import "io" -import "net" -import "time" -import json "github.com/couchbase/indexing/secondary/common/json" -import "sync/atomic" - -import "github.com/couchbase/indexing/secondary/logging" -import "github.com/couchbase/indexing/secondary/common" -import protobuf "github.com/couchbase/indexing/secondary/protobuf/query" -import "github.com/couchbase/indexing/secondary/transport" -import "github.com/golang/protobuf/proto" +import ( + "errors" + "fmt" + "io" + "net" + "time" + + "sync/atomic" + + "github.com/couchbase/indexing/secondary/common" + json "github.com/couchbase/indexing/secondary/common/json" + "github.com/couchbase/indexing/secondary/logging" + + protobuf "github.com/couchbase/indexing/secondary/protobuf/query" + "github.com/couchbase/indexing/secondary/transport" + "github.com/golang/protobuf/proto" +) // GsiScanClient for scan operations. type GsiScanClient struct { @@ -1352,8 +1356,10 @@ REQUEST_RESPONSE_RETRY: healthy = false return nil, err } else if endResp != nil { - healthy = false - return nil, ErrorProtocol + if _, ok := endResp.(*protobuf.StreamEndResponse); !ok { + healthy = false + return nil, ErrorProtocol + } } return resp, nil } @@ -1397,34 +1403,38 @@ func (c *GsiScanClient) streamResponse( callb(&protobuf.StreamEndResponse{}) // callback most likely return true cont, healthy = false, true - } else { - if rsp, ok := resp.(*protobuf.AuthResponse); ok { - - // When the cluster upgrade completes and the queryport starts supporting - // auth. See doRequestResponse for more details. + } else if ar, ok := resp.(*protobuf.AuthResponse); ok { + // When the cluster upgrade completes and the queryport starts supporting + // auth. See doRequestResponse for more details. - atomic.StoreUint32(c.needsAuth, uint32(1)) - - if rsp.GetCode() == transport.AUTH_MISSING { - // Do not count this as a "retry" - logging.Infof("%v server needs authentication information. Retrying "+ - "request with auth req(%v)", c.logPrefix, requestId) - // TODO: Update cluster version + atomic.StoreUint32(c.needsAuth, uint32(1)) - // Perform authRetry only once. - if !authRetryOnce { - authRetry = true - } + if ar.GetCode() == transport.AUTH_MISSING { + // Do not count this as a "retry" + logging.Infof("%v server needs authentication information. Retrying "+ + "request with auth req(%v)", c.logPrefix, requestId) + // TODO: Update cluster version - return + // Perform authRetry only once. + if !authRetryOnce { + authRetry = true } - } - streamResp := resp.(*protobuf.ResponseStream) + return + } + } else if streamResp, ok := resp.(*protobuf.ResponseStream); ok { if err = streamResp.Error(); err == nil { cont = callb(streamResp) } healthy = true + } else if ser, ok := resp.(*protobuf.StreamEndResponse); ok { + finish = true + logging.Tracef("%v req(%v) connection %q received StreamEndResponse", c.logPrefix, requestId, laddr) + callb(ser) // callback most likely return true + cont, healthy = false, true + } else { + cont, healthy = false, false + err = ErrorProtocol } if cont == false && healthy == true && finish == false { diff --git a/secondary/queryport/client/scatter.go b/secondary/queryport/client/scatter.go index 66145d9a5..697fdef37 100644 --- a/secondary/queryport/client/scatter.go +++ b/secondary/queryport/client/scatter.go @@ -1412,6 +1412,10 @@ func (d *bypassResponseReader) Error() error { return nil } +func (d *bypassResponseReader) GetReadUnits() uint64 { + return 0 +} + func makeDefaultRequestBroker(cb ResponseHandler, dataEncFmt common.DataEncodingFormat) *RequestBroker { diff --git a/secondary/queryport/n1ql/secondary_index.go b/secondary/queryport/n1ql/secondary_index.go index 9d82f1c4e..919da3c00 100644 --- a/secondary/queryport/n1ql/secondary_index.go +++ b/secondary/queryport/n1ql/secondary_index.go @@ -36,6 +36,7 @@ import ( "github.com/couchbase/query/errors" "github.com/couchbase/query/expression" "github.com/couchbase/query/expression/parser" + "github.com/couchbase/query/tenant" "github.com/couchbase/query/timestamp" "github.com/couchbase/query/value" @@ -1766,6 +1767,11 @@ func makeResponsehandler( // scatter/gather. // return func(data qclient.ResponseReader) bool { + if data.GetReadUnits() != 0 { + readUnits := data.GetReadUnits() + conn.RecordGsiRU(tenant.Unit(readUnits)) + } + err := data.Error() if err != nil { conn.Error(n1qlError(client, err)) diff --git a/secondary/queryport/server.go b/secondary/queryport/server.go index 6e61566af..b10a9818b 100644 --- a/secondary/queryport/server.go +++ b/secondary/queryport/server.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "github.com/couchbase/indexing/secondary/common" "github.com/couchbase/indexing/secondary/logging" "github.com/couchbase/indexing/secondary/security" @@ -26,13 +27,15 @@ import ( // channel, until `quitch` is closed. When there are // no more response to post handler shall close `respch`. type RequestHandler func( - req interface{}, ctx interface{}, conn net.Conn, quitch <-chan bool) + req interface{}, ctx interface{}, conn net.Conn, quitch <-chan bool, + clientVersion uint32) type ConnectionHandler func() interface{} type request struct { - r interface{} - quitch chan bool + r interface{} + quitch chan bool + clientVer uint32 } var Ping *request = &request{} @@ -263,7 +266,7 @@ func (s *Server) enforceAuth(raddr string) bool { return false } -func (s *Server) doAuth(conn net.Conn) (interface{}, error) { +func (s *Server) doAuth(conn net.Conn) (interface{}, uint32, error) { // TODO: Some code deduplication with doReveive can be done. raddr := conn.RemoteAddr().String() @@ -283,7 +286,7 @@ func (s *Server) doAuth(conn net.Conn) (interface{}, error) { reqMsg, err := rpkt.Receive(conn) if err != nil { - return nil, err + return nil, 0, err } // Reset read deadline @@ -299,7 +302,7 @@ func (s *Server) doAuth(conn net.Conn) (interface{}, error) { logging.Infof("%v connection %q doAuth() authentication is missing", s.logPrefix, raddr) if !s.enforceAuth(raddr) { - return reqMsg, nil + return reqMsg, 0, nil } code = transport.AUTH_MISSING @@ -325,21 +328,21 @@ func (s *Server) doAuth(conn net.Conn) (interface{}, error) { err = rpkt.Send(conn, resp) if err != nil { - return nil, err + return nil, req.GetClientVersion(), err } if authErr == nil { logging.Verbosef("%v connection %q auth successful", s.logPrefix, raddr) } - return nil, authErr + return nil, req.GetClientVersion(), authErr } // handle connection request. connection might be kept open in client's // connection pool. func (s *Server) handleConnection(conn net.Conn) { - req, err := s.doAuth(conn) + req, clientVersion, err := s.doAuth(conn) if err != nil { // On authentication error, just close the connection. Client // will try with a new connection by sending AuthRequest. @@ -382,8 +385,8 @@ func (s *Server) handleConnection(conn net.Conn) { } for req := range rcvch { - s.callb(req.r, ctx, conn, req.quitch) // blocking call - if req.r != Ping { + s.callb(req.r, ctx, conn, req.quitch, clientVersion) // blocking call + if clientVersion < common.INDEXER_72_VERSION && req.r != Ping { transport.SendResponseEnd(conn) } }