Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-18.01.2022-09.32.pass.html
Change-Id: I03733fec2d24aa0fb61babbaef57faf939f5b5dd
  • Loading branch information
amithk committed Jan 18, 2022
2 parents b169be4 + 3ee82a8 commit 011e24a
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 45 deletions.
18 changes: 18 additions & 0 deletions secondary/common/cluster_info.go
Expand Up @@ -778,6 +778,24 @@ func (c *ClusterInfoCache) GetActiveIndexerNodes() (nodes []couchbase.Node) {
return
}

func (c *ClusterInfoCache) GetServiceFromPort(addr string) (string, error) {

_, port, err := net.SplitHostPort(addr)
if err != nil {
return "", err
}

for _, node := range c.nodesvs {
for svc, svcPort := range node.Services {
if fmt.Sprint(svcPort) == port {
return svc, nil
}
}
}

return "", fmt.Errorf("Port number %v not found", port)
}

//
// Translate a hostport for one service for a node to the hostport
// for the other service on the same node.
Expand Down
7 changes: 7 additions & 0 deletions secondary/common/config.go
Expand Up @@ -566,6 +566,13 @@ var SystemConfig = Config{
false, // mutable
false, // case-insensitive
},
"indexer.dataport.enableAuth": ConfigValue{
false,
"force authentication for dataport server",
false,
true, // immutable
false, // case-insensitive
},
// indexer queryport configuration
"indexer.queryport.maxPayload": ConfigValue{
64 * 1024,
Expand Down
4 changes: 3 additions & 1 deletion secondary/dataport/app.go
Expand Up @@ -53,8 +53,10 @@ func Application(
return true
}

enableAuth := uint32(1)

appch := make(chan interface{}, 10000)
_, err := NewServer(addr, maxvbs, config, appch)
_, err := NewServer(addr, maxvbs, config, appch, &enableAuth)
if err != nil && doCallb(err) == false {
return
}
Expand Down
123 changes: 118 additions & 5 deletions secondary/dataport/endpoint.go
Expand Up @@ -32,6 +32,10 @@ import (
"github.com/couchbase/indexing/secondary/stats"
"github.com/couchbase/indexing/secondary/transport"

"github.com/couchbase/cbauth"

protobuf "github.com/couchbase/indexing/secondary/protobuf/data"

dcpTransport "github.com/couchbase/indexing/secondary/dcp/transport"
)

Expand Down Expand Up @@ -69,6 +73,8 @@ type RouterEndpoint struct {

// Mapping between vbuckets and keyspace
keyspaceIdVBMap map[string]map[uint16]bool

authHost string
}

type EndpointStats struct {
Expand Down Expand Up @@ -189,7 +195,6 @@ func NewRouterEndpoint(
keyspaceIdVBMap: make(map[string]map[uint16]bool),
}
endpoint.ch = make(chan []interface{}, endpoint.keyChSize)
endpoint.conn = conn

endpoint.stats.Init()
endpoint.stats.endpCh = endpoint.ch
Expand All @@ -200,14 +205,26 @@ func NewRouterEndpoint(
endpoint.pkt.SetEncoder(transport.EncodingProtobuf, protobufEncode)
endpoint.pkt.SetDecoder(transport.EncodingProtobuf, protobufDecode)

endpoint.bufferTm *= time.Millisecond
endpoint.harakiriTm *= time.Millisecond
endpoint.syncTm *= time.Millisecond

endpoint.logPrefix = fmt.Sprintf(
"ENDP[<-(%v,%4x)<-%v #%v]",
endpoint.raddr, uint16(endpoint.timestamp), cluster, topic)

// Ignore the error in initHostportForAuth, if any.
// It will be retried again in doAuth.
endpoint.initHostportForAuth()

// doAuth
if err := endpoint.doAuth(conn); err != nil {
logging.Errorf("%v doAuth error %v", endpoint.logPrefix, err)
return nil, err
}

endpoint.conn = conn

endpoint.bufferTm *= time.Millisecond
endpoint.harakiriTm *= time.Millisecond
endpoint.syncTm *= time.Millisecond

go endpoint.run(endpoint.ch)
logging.Infof("%v started ...\n", endpoint.logPrefix)
return endpoint, nil
Expand All @@ -228,6 +245,102 @@ func (endpoint *RouterEndpoint) Ping() bool {
return atomic.LoadUint32(&endpoint.done) == 0
}

func (endpoint *RouterEndpoint) initHostportForAuth() error {

// TODO: Use cluster info lite

clusterUrl, err := c.ClusterAuthUrl(endpoint.cluster)
if err != nil {
return err
}

cinfo, err := c.NewClusterInfoCache(clusterUrl, c.DEFAULT_POOL)
if err != nil {
return err
}

cinfo.Lock()
defer cinfo.Unlock()

cinfo.SetUserAgent(endpoint.logPrefix)

err = cinfo.FetchNodesAndSvsInfo()
if err != nil {
return err
}

service, err := cinfo.GetServiceFromPort(endpoint.raddr)
if err != nil {
endpoint.authHost = ""
return err
}

var authHost string
authHost, err = cinfo.TranslatePort(endpoint.raddr, service, c.INDEX_HTTP_SERVICE)
if err != nil {
endpoint.authHost = ""
return err
}

endpoint.authHost = authHost
return nil
}

func (endpoint *RouterEndpoint) getAuthInfo() (string, string, error) {

if endpoint.authHost == "" {
err := endpoint.initHostportForAuth()
if err != nil {
logging.Errorf("%v doAtuh error in initHostportForAuth: %v", endpoint.logPrefix, err)
return "", "", err
}
}

user, pass, err := cbauth.GetHTTPServiceAuth(endpoint.authHost)
if err != nil {
logging.Errorf("%v doAuth cbauth.GetHTTPServiceAuth returns error %v", endpoint.logPrefix, err)
return "", "", err
}

return user, pass, nil
}

func (endpoint *RouterEndpoint) doAuth(conn net.Conn) error {
// Check if auth is supported / configured before doing auth
if c.GetClusterVersion() < c.INDEXER_71_VERSION {
logging.Verbosef("%v doAuth Auth is not needed.", endpoint.logPrefix)
return nil
}

// Endpoint only sends the authRequest to the server. But it does not
// wait for any response from the server. This adheres to the current
// communication mechanism between endpoint and dataport server, i.e.
// one way streaming communication. If in case the server rejects
// the auth request, the endpoint will observe the connection being reset
// by the server and indexer will do the required stream clenaup and restart.

user, pass, err := endpoint.getAuthInfo()
if err != nil {
logging.Errorf("%v doAuth error %v in getAuthInfo", endpoint.logPrefix, err)
return err
}

// Send Auth packet.
authReq := &protobuf.AuthRequest{
User: &user,
Pass: &pass,
}

err = endpoint.pkt.Send(conn, authReq)
if err != nil {
logging.Errorf("%v doAuth pkt.Send returns error %v", endpoint.logPrefix, err)
return err
}

logging.Verbosef("%v doAuth auth sent", endpoint.logPrefix)
return nil
}

// ResetConfig synchronous call.
func (endpoint *RouterEndpoint) ResetConfig(config c.Config) error {
respch := make(chan []interface{}, 1)
Expand Down
6 changes: 6 additions & 0 deletions secondary/dataport/protobuf.go
Expand Up @@ -74,6 +74,12 @@ func protobufEncode(payload interface{}) (data []byte, err error) {
Vbuuids: val.Vbuuids,
Vbuckets: c.Vbno16to32(val.Vbuckets),
}

case *protobuf.AuthRequest:
pl.AuthRequest = &protobuf.AuthRequest{
User: proto.String(*val.User),
Pass: proto.String(*val.Pass),
}
}

if err == nil {
Expand Down

0 comments on commit 011e24a

Please sign in to comment.