Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-10.01.2021-05.30.pass.html
Change-Id: I7f6924b916f6f3fae15648e056dd845d54cc2277
  • Loading branch information
jeelanp2003 committed Jan 10, 2021
2 parents 332c377 + c3c0827 commit 32d02fd
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 57 deletions.
Expand Up @@ -9,13 +9,14 @@
// client.Request(stats, stats)
// }

package adminport
package client

import "bytes"
import "io/ioutil"
import "net/http"
import "strings"
import "github.com/couchbase/indexing/secondary/security"
import apcommon "github.com/couchbase/indexing/secondary/adminport/common"

// httpClient is a concrete type implementing Client interface.
type httpClient struct {
Expand All @@ -25,7 +26,7 @@ type httpClient struct {
}

// NewHTTPClient returns a new instance of Client over HTTP.
func NewHTTPClient(listenAddr, urlPrefix string) (Client, error) {
func NewHTTPClient(listenAddr, urlPrefix string) (apcommon.Client, error) {
if !strings.HasPrefix(listenAddr, "http://") {
listenAddr = "http://" + listenAddr
}
Expand All @@ -43,7 +44,7 @@ func NewHTTPClient(listenAddr, urlPrefix string) (Client, error) {
}

// Request is part of `Client` interface
func (c *httpClient) Request(msg, resp MessageMarshaller) (err error) {
func (c *httpClient) Request(msg, resp apcommon.MessageMarshaller) (err error) {
return doResponse(func() (*http.Response, error) {
// marshall message
body, err := msg.Encode()
Expand All @@ -70,7 +71,7 @@ func (c *httpClient) Request(msg, resp MessageMarshaller) (err error) {
}, resp)
}

func doResponse(postRequest func() (*http.Response, error), resp MessageMarshaller) error {
func doResponse(postRequest func() (*http.Response, error), resp apcommon.MessageMarshaller) error {
htresp, err := postRequest() // get response back from server
if err != nil {
return err
Expand Down
Expand Up @@ -3,7 +3,7 @@
// request/response protocol, by default used http for transport and protobuf,
// JSON for payload. Admin port can typically be used for collecting
// statistics, administering and managing cluster.
package adminport
package common

import "errors"
import c "github.com/couchbase/indexing/secondary/common"
Expand Down
Expand Up @@ -24,7 +24,7 @@
// server.Stop()
// }

package adminport
package server

import "fmt"
import "expvar"
Expand All @@ -39,6 +39,7 @@ import "time"
import "github.com/couchbase/indexing/secondary/security"
import "github.com/couchbase/indexing/secondary/logging"
import c "github.com/couchbase/indexing/secondary/common"
import apcommon "github.com/couchbase/indexing/secondary/adminport/common"

// httpServer is a concrete type implementing adminport Server
// interface.
Expand All @@ -47,16 +48,17 @@ type httpServer struct {
lis net.Listener // TCP listener
mux *http.ServeMux
srv *http.Server // http server
messages map[string]MessageMarshaller
messages map[string]apcommon.MessageMarshaller
conns []net.Conn
reqch chan<- Request // request channel back to application
reqch chan<- apcommon.Request // request channel back to application

// config params
name string // human readable name for this server
laddr string // address to bind and listen
urlPrefix string // URL path prefix for adminport
rtimeout time.Duration
wtimeout time.Duration
rhtimeout time.Duration
maxHdrlen int

// local
Expand All @@ -68,9 +70,9 @@ type httpServer struct {

// NewHTTPServer creates an instance of admin-server.
// Start() will actually start the server.
func NewHTTPServer(config c.Config, reqch chan<- Request) (Server, error) {
func NewHTTPServer(config c.Config, reqch chan<- apcommon.Request) (apcommon.Server, error) {
s := &httpServer{
messages: make(map[string]MessageMarshaller),
messages: make(map[string]apcommon.MessageMarshaller),
conns: make([]net.Conn, 0),
reqch: reqch,
statsInBytes: 0.0,
Expand All @@ -82,6 +84,8 @@ func NewHTTPServer(config c.Config, reqch chan<- Request) (Server, error) {
urlPrefix: config["urlPrefix"].String(),
rtimeout: time.Duration(config["readTimeout"].Int()),
wtimeout: time.Duration(config["writeTimeout"].Int()),
rhtimeout: time.Duration(config["readHeaderTimeout"].Int()),

maxHdrlen: config["maxHeaderBytes"].Int(),
}
s.logPrefix = fmt.Sprintf("%s[%s]", s.name, s.laddr)
Expand All @@ -90,12 +94,13 @@ func NewHTTPServer(config c.Config, reqch chan<- Request) (Server, error) {
s.mux.HandleFunc(s.urlPrefix, s.systemHandler)
s.mux.HandleFunc("/debug/vars", s.expvarHandler)
s.srv = &http.Server{
Addr: s.laddr,
Handler: s.mux,
ConnState: s.connState,
ReadTimeout: s.rtimeout * time.Millisecond,
WriteTimeout: s.wtimeout * time.Millisecond,
MaxHeaderBytes: s.maxHdrlen,
Addr: s.laddr,
Handler: s.mux,
ConnState: s.connState,
ReadTimeout: s.rtimeout * time.Millisecond,
WriteTimeout: s.wtimeout * time.Millisecond,
ReadHeaderTimeout: s.rhtimeout * time.Millisecond,
MaxHeaderBytes: s.maxHdrlen,
}

if err := security.SecureServer(s.srv); err != nil {
Expand All @@ -118,13 +123,13 @@ func validateAuth(w http.ResponseWriter, r *http.Request) bool {
}

// Register is part of Server interface.
func (s *httpServer) Register(msg MessageMarshaller) (err error) {
func (s *httpServer) Register(msg apcommon.MessageMarshaller) (err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.lis != nil {
logging.Errorf("%v can't register, server already started\n", s.logPrefix)
return ErrorRegisteringRequest
return apcommon.ErrorRegisteringRequest
}
key := fmt.Sprintf("%v%v", s.urlPrefix, msg.Name())
s.messages[key] = msg
Expand All @@ -142,7 +147,7 @@ func (s *httpServer) RegisterHTTPHandler(

if s.lis != nil {
logging.Errorf("%v can't register, server already started\n", s.logPrefix)
return ErrorRegisteringRequest
return apcommon.ErrorRegisteringRequest
}

switch handl := handler.(type) {
Expand All @@ -158,18 +163,18 @@ func (s *httpServer) RegisterHTTPHandler(
}

// Unregister is part of Server interface.
func (s *httpServer) Unregister(msg MessageMarshaller) (err error) {
func (s *httpServer) Unregister(msg apcommon.MessageMarshaller) (err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.lis != nil {
logging.Errorf("%v can't unregister, server already started\n", s.logPrefix)
return ErrorRegisteringRequest
return apcommon.ErrorRegisteringRequest
}
name := msg.Name()
if _, ok := s.messages[name]; !ok {
logging.Errorf("%v message %q hasn't been registered\n", s.logPrefix, name)
return ErrorMessageUnknown
return apcommon.ErrorMessageUnknown
}
delete(s.messages, name)
logging.Infof("%s unregistered %s\n", s.logPrefix, s.getURL(msg))
Expand Down Expand Up @@ -199,7 +204,7 @@ func (s *httpServer) Start() (err error) {

if s.lis != nil {
logging.Errorf("%v already started ...\n", s.logPrefix)
return ErrorServerStarted
return apcommon.ErrorServerStarted
}

if s.lis, err = security.MakeListener(s.srv.Addr); err != nil {
Expand Down Expand Up @@ -285,22 +290,22 @@ func (s *httpServer) systemHandler(w http.ResponseWriter, r *http.Request) {
// get request message type.
msg, ok := s.messages[r.URL.Path]
if !ok {
err = ErrorPathNotFound
err = apcommon.ErrorPathNotFound
http.Error(w, "path not found", http.StatusNotFound)
return
}
// read request
dataIn = make([]byte, r.ContentLength)
if err := requestRead(r.Body, dataIn); err != nil {
err = fmt.Errorf("%v, %v", ErrorRequest, err)
err = fmt.Errorf("%v, %v", apcommon.ErrorRequest, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Get an instance of request type and decode request into that.
typeOfMsg := reflect.ValueOf(msg).Elem().Type()
msg = reflect.New(typeOfMsg).Interface().(MessageMarshaller)
msg = reflect.New(typeOfMsg).Interface().(apcommon.MessageMarshaller)
if err = msg.Decode(dataIn); err != nil {
err = fmt.Errorf("%v, %v", ErrorDecodeRequest, err)
err = fmt.Errorf("%v, %v", apcommon.ErrorDecodeRequest, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -311,19 +316,19 @@ func (s *httpServer) systemHandler(w http.ResponseWriter, r *http.Request) {
val := <-waitch

switch v := (val).(type) {
case MessageMarshaller:
case apcommon.MessageMarshaller:
if dataOut, err = v.Encode(); err == nil {
header := w.Header()
header["Content-Type"] = []string{v.ContentType()}
w.Write(dataOut)

} else {
err = fmt.Errorf("%v, %v", ErrorEncodeResponse, err)
err = fmt.Errorf("%v, %v", apcommon.ErrorEncodeResponse, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

case error:
err = fmt.Errorf("%v, %v", ErrorInternal, v)
err = fmt.Errorf("%v, %v", apcommon.ErrorInternal, v)
http.Error(w, v.Error(), http.StatusInternalServerError)
}
}
Expand Down Expand Up @@ -364,7 +369,7 @@ func (s *httpServer) connState(conn net.Conn, state http.ConnState) {
}
}

func (s *httpServer) getURL(msg MessageMarshaller) string {
func (s *httpServer) getURL(msg apcommon.MessageMarshaller) string {
return s.urlPrefix + msg.Name()
}

Expand All @@ -389,17 +394,17 @@ func requestRead(r io.Reader, data []byte) (err error) {
// concrete type implementing Request interface
type httpAdminRequest struct {
srv *httpServer
msg MessageMarshaller
msg apcommon.MessageMarshaller
waitch chan interface{}
}

// GetMessage is part of Request interface.
func (r *httpAdminRequest) GetMessage() MessageMarshaller {
func (r *httpAdminRequest) GetMessage() apcommon.MessageMarshaller {
return r.msg
}

// Send is part of Request interface.
func (r *httpAdminRequest) Send(msg MessageMarshaller) error {
func (r *httpAdminRequest) Send(msg apcommon.MessageMarshaller) error {
r.waitch <- msg
close(r.waitch)
return nil
Expand Down
44 changes: 44 additions & 0 deletions secondary/common/config.go
Expand Up @@ -308,6 +308,15 @@ var SystemConfig = Config{
true, // immutable
false, // case-insensitive
},
"projector.adminport.readHeaderTimeout": ConfigValue{
5 * 1000,
"timeout in milliseconds, is http server's read header timeout, " +
"also refer to projector.dataport.harakiriTimeout, " +
"projector.adminport.readTimeout and indexer.dataport.tcpReadDeadline",
5 * 1000,
true, // immutable
false, // case-insensitive
},
"projector.adminport.writeTimeout": ConfigValue{
0,
"timeout in milliseconds, is http server's write timeout",
Expand Down Expand Up @@ -1160,6 +1169,41 @@ var SystemConfig = Config{
false, // mutable,
false, // case-insensitive
},
"indexer.plasma.AutoTuneDiskQuota": ConfigValue{
uint64(0),
"Disk Quota for LSS frag ratio tuning",
uint64(0),
false, // mutable,
false, // case-insensitive
},
"indexer.plasma.AutoTuneCleanerTargetFragRatio": ConfigValue{
50,
"Target LSS Cleaner fragmentation ratio for auto tuning",
50,
false, // mutable,
false, // case-insensitive
},
"indexer.plasma.AutoTuneCleanerMinBandwidthRatio": ConfigValue{
float64(0.1),
"Minimum bandwidth (percentage) allocated for LSS cleaning with auto tuning",
float64(0.1),
false, // mutable,
false, // case-insensitive
},
"indexer.plasma.AutoTuneDiskFullTimeLimit": ConfigValue{
3600,
"time allowance (in second) before disk is full",
3600,
false, // mutable,
false, // case-insensitive
},
"indexer.plasma.AutoTuneAvailDiskLimit": ConfigValue{
float64(0.9),
"percentage of available disk space reserved for plasma",
float64(0.9),
false, // mutable,
false, // case-insensitive
},
"indexer.plasma.MaxPageSize": ConfigValue{
192 * 1024,
"Used with AutoTuneLSSCleaner; target page size limit",
Expand Down
15 changes: 15 additions & 0 deletions secondary/indexer/plasma_slice.go
Expand Up @@ -311,6 +311,11 @@ func (slice *plasmaSlice) initStores() error {
cfg.CheckpointInterval = time.Second * time.Duration(slice.sysconf["plasma.checkpointInterval"].Int())
cfg.LSSCleanerConcurrency = slice.sysconf["plasma.LSSCleanerConcurrency"].Int()
cfg.AutoTuneLSSCleaning = slice.sysconf["plasma.AutoTuneLSSCleaner"].Bool()
cfg.AutoTuneDiskQuota = int64(slice.sysconf["plasma.AutoTuneDiskQuota"].Uint64())
cfg.AutoTuneCleanerTargetFragRatio = slice.sysconf["plasma.AutoTuneCleanerTargetFragRatio"].Int()
cfg.AutoTuneCleanerMinBandwidthRatio = slice.sysconf["plasma.AutoTuneCleanerMinBandwidthRatio"].Float64()
cfg.AutoTuneDiskFullTimeLimit = slice.sysconf["plasma.AutoTuneDiskFullTimeLimit"].Int()
cfg.AutoTuneAvailDiskLimit = slice.sysconf["plasma.AutoTuneAvailDiskLimit"].Float64()
cfg.Compression = slice.sysconf["plasma.compression"].String()
cfg.MaxPageSize = slice.sysconf["plasma.MaxPageSize"].Int()
cfg.AutoLSSCleaning = !slice.sysconf["settings.compaction.plasma.manual"].Bool()
Expand Down Expand Up @@ -2267,6 +2272,11 @@ func (mdb *plasmaSlice) UpdateConfig(cfg common.Config) {

updatePlasmaConfig(cfg)
mdb.mainstore.AutoTuneLSSCleaning = cfg["plasma.AutoTuneLSSCleaner"].Bool()
mdb.mainstore.AutoTuneDiskQuota = int64(cfg["plasma.AutoTuneDiskQuota"].Uint64())
mdb.mainstore.AutoTuneCleanerTargetFragRatio = cfg["plasma.AutoTuneCleanerTargetFragRatio"].Int()
mdb.mainstore.AutoTuneCleanerMinBandwidthRatio = cfg["plasma.AutoTuneCleanerMinBandwidthRatio"].Float64()
mdb.mainstore.AutoTuneDiskFullTimeLimit = cfg["plasma.AutoTuneDiskFullTimeLimit"].Int()
mdb.mainstore.AutoTuneAvailDiskLimit = cfg["plasma.AutoTuneAvailDiskLimit"].Float64()
mdb.mainstore.MaxPageSize = cfg["plasma.MaxPageSize"].Int()
mdb.mainstore.EnforceKeyRange = cfg["plasma.enforceKeyRange"].Bool()

Expand Down Expand Up @@ -2313,6 +2323,11 @@ func (mdb *plasmaSlice) UpdateConfig(cfg common.Config) {

if !mdb.isPrimary {
mdb.backstore.AutoTuneLSSCleaning = cfg["plasma.AutoTuneLSSCleaner"].Bool()
mdb.backstore.AutoTuneDiskQuota = int64(cfg["plasma.AutoTuneDiskQuota"].Uint64())
mdb.backstore.AutoTuneCleanerTargetFragRatio = cfg["plasma.AutoTuneCleanerTargetFragRatio"].Int()
mdb.backstore.AutoTuneCleanerMinBandwidthRatio = cfg["plasma.AutoTuneCleanerMinBandwidthRatio"].Float64()
mdb.backstore.AutoTuneDiskFullTimeLimit = cfg["plasma.AutoTuneDiskFullTimeLimit"].Int()
mdb.backstore.AutoTuneAvailDiskLimit = cfg["plasma.AutoTuneAvailDiskLimit"].Float64()
mdb.backstore.MaxPageSize = cfg["plasma.MaxPageSize"].Int()
mdb.backstore.EnforceKeyRange = cfg["plasma.enforceKeyRange"].Bool()
mdb.backstore.CheckpointInterval = mdb.mainstore.CheckpointInterval
Expand Down
8 changes: 4 additions & 4 deletions secondary/projector/adminport.go
Expand Up @@ -3,7 +3,7 @@ package projector
import "os"
import "time"

import ap "github.com/couchbase/indexing/secondary/adminport"
import apcommon "github.com/couchbase/indexing/secondary/adminport/common"
import c "github.com/couchbase/indexing/secondary/common"
import protobuf "github.com/couchbase/indexing/secondary/protobuf/projector"
import "github.com/couchbase/indexing/secondary/logging"
Expand All @@ -25,7 +25,7 @@ var reqStats = c.Statistics{}
var angioToken = uint16(1)

// admin-port entry point, once started never shutsdown.
func (p *Projector) mainAdminPort(reqch chan ap.Request) {
func (p *Projector) mainAdminPort(reqch chan apcommon.Request) {
p.admind.Register(reqVbmap)
p.admind.Register(reqFailoverLog)
p.admind.Register(reqMutationFeed)
Expand Down Expand Up @@ -87,8 +87,8 @@ loop:
}

// re-entrant / concurrent request handler.
func (p *Projector) handleRequest(req ap.Request, opaque uint16) {
var response ap.MessageMarshaller
func (p *Projector) handleRequest(req apcommon.Request, opaque uint16) {
var response apcommon.MessageMarshaller
var err error

msg := req.GetMessage()
Expand Down

0 comments on commit 32d02fd

Please sign in to comment.