Skip to content

Commit

Permalink
MB-37480: Set default HTTP read header timeout for projector to 5 sec…
Browse files Browse the repository at this point in the history
…onds.

Change-Id: I608e881e475ce9ffc06b0d2cfe29b2290f9adca3
  • Loading branch information
amithk committed Jan 9, 2021
1 parent f78a05e commit c5573f0
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 57 deletions.
Expand Up @@ -9,13 +9,14 @@
// client.Request(stats, stats)
// }

package adminport
package client

import "bytes"
import "io/ioutil"
import "net/http"
import "strings"
import "github.com/couchbase/indexing/secondary/security"
import apcommon "github.com/couchbase/indexing/secondary/adminport/common"

// httpClient is a concrete type implementing Client interface.
type httpClient struct {
Expand All @@ -25,7 +26,7 @@ type httpClient struct {
}

// NewHTTPClient returns a new instance of Client over HTTP.
func NewHTTPClient(listenAddr, urlPrefix string) (Client, error) {
func NewHTTPClient(listenAddr, urlPrefix string) (apcommon.Client, error) {
if !strings.HasPrefix(listenAddr, "http://") {
listenAddr = "http://" + listenAddr
}
Expand All @@ -43,7 +44,7 @@ func NewHTTPClient(listenAddr, urlPrefix string) (Client, error) {
}

// Request is part of `Client` interface
func (c *httpClient) Request(msg, resp MessageMarshaller) (err error) {
func (c *httpClient) Request(msg, resp apcommon.MessageMarshaller) (err error) {
return doResponse(func() (*http.Response, error) {
// marshall message
body, err := msg.Encode()
Expand All @@ -70,7 +71,7 @@ func (c *httpClient) Request(msg, resp MessageMarshaller) (err error) {
}, resp)
}

func doResponse(postRequest func() (*http.Response, error), resp MessageMarshaller) error {
func doResponse(postRequest func() (*http.Response, error), resp apcommon.MessageMarshaller) error {
htresp, err := postRequest() // get response back from server
if err != nil {
return err
Expand Down
Expand Up @@ -3,7 +3,7 @@
// request/response protocol, by default used http for transport and protobuf,
// JSON for payload. Admin port can typically be used for collecting
// statistics, administering and managing cluster.
package adminport
package common

import "errors"
import c "github.com/couchbase/indexing/secondary/common"
Expand Down
Expand Up @@ -24,7 +24,7 @@
// server.Stop()
// }

package adminport
package server

import "fmt"
import "expvar"
Expand All @@ -39,6 +39,7 @@ import "time"
import "github.com/couchbase/indexing/secondary/security"
import "github.com/couchbase/indexing/secondary/logging"
import c "github.com/couchbase/indexing/secondary/common"
import apcommon "github.com/couchbase/indexing/secondary/adminport/common"

// httpServer is a concrete type implementing adminport Server
// interface.
Expand All @@ -47,16 +48,17 @@ type httpServer struct {
lis net.Listener // TCP listener
mux *http.ServeMux
srv *http.Server // http server
messages map[string]MessageMarshaller
messages map[string]apcommon.MessageMarshaller
conns []net.Conn
reqch chan<- Request // request channel back to application
reqch chan<- apcommon.Request // request channel back to application

// config params
name string // human readable name for this server
laddr string // address to bind and listen
urlPrefix string // URL path prefix for adminport
rtimeout time.Duration
wtimeout time.Duration
rhtimeout time.Duration
maxHdrlen int

// local
Expand All @@ -68,9 +70,9 @@ type httpServer struct {

// NewHTTPServer creates an instance of admin-server.
// Start() will actually start the server.
func NewHTTPServer(config c.Config, reqch chan<- Request) (Server, error) {
func NewHTTPServer(config c.Config, reqch chan<- apcommon.Request) (apcommon.Server, error) {
s := &httpServer{
messages: make(map[string]MessageMarshaller),
messages: make(map[string]apcommon.MessageMarshaller),
conns: make([]net.Conn, 0),
reqch: reqch,
statsInBytes: 0.0,
Expand All @@ -82,6 +84,8 @@ func NewHTTPServer(config c.Config, reqch chan<- Request) (Server, error) {
urlPrefix: config["urlPrefix"].String(),
rtimeout: time.Duration(config["readTimeout"].Int()),
wtimeout: time.Duration(config["writeTimeout"].Int()),
rhtimeout: time.Duration(config["readHeaderTimeout"].Int()),

maxHdrlen: config["maxHeaderBytes"].Int(),
}
s.logPrefix = fmt.Sprintf("%s[%s]", s.name, s.laddr)
Expand All @@ -90,12 +94,13 @@ func NewHTTPServer(config c.Config, reqch chan<- Request) (Server, error) {
s.mux.HandleFunc(s.urlPrefix, s.systemHandler)
s.mux.HandleFunc("/debug/vars", s.expvarHandler)
s.srv = &http.Server{
Addr: s.laddr,
Handler: s.mux,
ConnState: s.connState,
ReadTimeout: s.rtimeout * time.Millisecond,
WriteTimeout: s.wtimeout * time.Millisecond,
MaxHeaderBytes: s.maxHdrlen,
Addr: s.laddr,
Handler: s.mux,
ConnState: s.connState,
ReadTimeout: s.rtimeout * time.Millisecond,
WriteTimeout: s.wtimeout * time.Millisecond,
ReadHeaderTimeout: s.rhtimeout * time.Millisecond,
MaxHeaderBytes: s.maxHdrlen,
}

if err := security.SecureServer(s.srv); err != nil {
Expand All @@ -118,13 +123,13 @@ func validateAuth(w http.ResponseWriter, r *http.Request) bool {
}

// Register is part of Server interface.
func (s *httpServer) Register(msg MessageMarshaller) (err error) {
func (s *httpServer) Register(msg apcommon.MessageMarshaller) (err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.lis != nil {
logging.Errorf("%v can't register, server already started\n", s.logPrefix)
return ErrorRegisteringRequest
return apcommon.ErrorRegisteringRequest
}
key := fmt.Sprintf("%v%v", s.urlPrefix, msg.Name())
s.messages[key] = msg
Expand All @@ -142,7 +147,7 @@ func (s *httpServer) RegisterHTTPHandler(

if s.lis != nil {
logging.Errorf("%v can't register, server already started\n", s.logPrefix)
return ErrorRegisteringRequest
return apcommon.ErrorRegisteringRequest
}

switch handl := handler.(type) {
Expand All @@ -158,18 +163,18 @@ func (s *httpServer) RegisterHTTPHandler(
}

// Unregister is part of Server interface.
func (s *httpServer) Unregister(msg MessageMarshaller) (err error) {
func (s *httpServer) Unregister(msg apcommon.MessageMarshaller) (err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.lis != nil {
logging.Errorf("%v can't unregister, server already started\n", s.logPrefix)
return ErrorRegisteringRequest
return apcommon.ErrorRegisteringRequest
}
name := msg.Name()
if _, ok := s.messages[name]; !ok {
logging.Errorf("%v message %q hasn't been registered\n", s.logPrefix, name)
return ErrorMessageUnknown
return apcommon.ErrorMessageUnknown
}
delete(s.messages, name)
logging.Infof("%s unregistered %s\n", s.logPrefix, s.getURL(msg))
Expand Down Expand Up @@ -199,7 +204,7 @@ func (s *httpServer) Start() (err error) {

if s.lis != nil {
logging.Errorf("%v already started ...\n", s.logPrefix)
return ErrorServerStarted
return apcommon.ErrorServerStarted
}

if s.lis, err = security.MakeListener(s.srv.Addr); err != nil {
Expand Down Expand Up @@ -285,22 +290,22 @@ func (s *httpServer) systemHandler(w http.ResponseWriter, r *http.Request) {
// get request message type.
msg, ok := s.messages[r.URL.Path]
if !ok {
err = ErrorPathNotFound
err = apcommon.ErrorPathNotFound
http.Error(w, "path not found", http.StatusNotFound)
return
}
// read request
dataIn = make([]byte, r.ContentLength)
if err := requestRead(r.Body, dataIn); err != nil {
err = fmt.Errorf("%v, %v", ErrorRequest, err)
err = fmt.Errorf("%v, %v", apcommon.ErrorRequest, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Get an instance of request type and decode request into that.
typeOfMsg := reflect.ValueOf(msg).Elem().Type()
msg = reflect.New(typeOfMsg).Interface().(MessageMarshaller)
msg = reflect.New(typeOfMsg).Interface().(apcommon.MessageMarshaller)
if err = msg.Decode(dataIn); err != nil {
err = fmt.Errorf("%v, %v", ErrorDecodeRequest, err)
err = fmt.Errorf("%v, %v", apcommon.ErrorDecodeRequest, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -311,19 +316,19 @@ func (s *httpServer) systemHandler(w http.ResponseWriter, r *http.Request) {
val := <-waitch

switch v := (val).(type) {
case MessageMarshaller:
case apcommon.MessageMarshaller:
if dataOut, err = v.Encode(); err == nil {
header := w.Header()
header["Content-Type"] = []string{v.ContentType()}
w.Write(dataOut)

} else {
err = fmt.Errorf("%v, %v", ErrorEncodeResponse, err)
err = fmt.Errorf("%v, %v", apcommon.ErrorEncodeResponse, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

case error:
err = fmt.Errorf("%v, %v", ErrorInternal, v)
err = fmt.Errorf("%v, %v", apcommon.ErrorInternal, v)
http.Error(w, v.Error(), http.StatusInternalServerError)
}
}
Expand Down Expand Up @@ -364,7 +369,7 @@ func (s *httpServer) connState(conn net.Conn, state http.ConnState) {
}
}

func (s *httpServer) getURL(msg MessageMarshaller) string {
func (s *httpServer) getURL(msg apcommon.MessageMarshaller) string {
return s.urlPrefix + msg.Name()
}

Expand All @@ -389,17 +394,17 @@ func requestRead(r io.Reader, data []byte) (err error) {
// concrete type implementing Request interface
type httpAdminRequest struct {
srv *httpServer
msg MessageMarshaller
msg apcommon.MessageMarshaller
waitch chan interface{}
}

// GetMessage is part of Request interface.
func (r *httpAdminRequest) GetMessage() MessageMarshaller {
func (r *httpAdminRequest) GetMessage() apcommon.MessageMarshaller {
return r.msg
}

// Send is part of Request interface.
func (r *httpAdminRequest) Send(msg MessageMarshaller) error {
func (r *httpAdminRequest) Send(msg apcommon.MessageMarshaller) error {
r.waitch <- msg
close(r.waitch)
return nil
Expand Down
9 changes: 9 additions & 0 deletions secondary/common/config.go
Expand Up @@ -308,6 +308,15 @@ var SystemConfig = Config{
true, // immutable
false, // case-insensitive
},
"projector.adminport.readHeaderTimeout": ConfigValue{
5 * 1000,
"timeout in milliseconds, is http server's read header timeout, " +
"also refer to projector.dataport.harakiriTimeout, " +
"projector.adminport.readTimeout and indexer.dataport.tcpReadDeadline",
5 * 1000,
true, // immutable
false, // case-insensitive
},
"projector.adminport.writeTimeout": ConfigValue{
0,
"timeout in milliseconds, is http server's write timeout",
Expand Down
8 changes: 4 additions & 4 deletions secondary/projector/adminport.go
Expand Up @@ -3,7 +3,7 @@ package projector
import "os"
import "time"

import ap "github.com/couchbase/indexing/secondary/adminport"
import apcommon "github.com/couchbase/indexing/secondary/adminport/common"
import c "github.com/couchbase/indexing/secondary/common"
import protobuf "github.com/couchbase/indexing/secondary/protobuf/projector"
import "github.com/couchbase/indexing/secondary/logging"
Expand All @@ -25,7 +25,7 @@ var reqStats = c.Statistics{}
var angioToken = uint16(1)

// admin-port entry point, once started never shutsdown.
func (p *Projector) mainAdminPort(reqch chan ap.Request) {
func (p *Projector) mainAdminPort(reqch chan apcommon.Request) {
p.admind.Register(reqVbmap)
p.admind.Register(reqFailoverLog)
p.admind.Register(reqMutationFeed)
Expand Down Expand Up @@ -87,8 +87,8 @@ loop:
}

// re-entrant / concurrent request handler.
func (p *Projector) handleRequest(req ap.Request, opaque uint16) {
var response ap.MessageMarshaller
func (p *Projector) handleRequest(req apcommon.Request, opaque uint16) {
var response apcommon.MessageMarshaller
var err error

msg := req.GetMessage()
Expand Down
7 changes: 4 additions & 3 deletions secondary/projector/client/client.go
Expand Up @@ -70,7 +70,8 @@ import (

"github.com/couchbase/indexing/secondary/logging"

ap "github.com/couchbase/indexing/secondary/adminport"
apclient "github.com/couchbase/indexing/secondary/adminport/client"
apcommon "github.com/couchbase/indexing/secondary/adminport/common"

c "github.com/couchbase/indexing/secondary/common"

Expand Down Expand Up @@ -134,7 +135,7 @@ var ErrorResponseTimeout = errors.New("feed.responseTimeout")
// issues request and get back response.
type Client struct {
adminport string
ap ap.Client
ap apcommon.Client
// config
maxVbuckets int
retryInterval int
Expand All @@ -152,7 +153,7 @@ func NewClient(adminport string, maxvbs int, config c.Config) (*Client, error) {
expBackoff := config["exponentialBackoff"].Int()

urlPrefix := config["urlPrefix"].String()
ap, err := ap.NewHTTPClient(adminport, urlPrefix)
ap, err := apclient.NewHTTPClient(adminport, urlPrefix)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c5573f0

Please sign in to comment.