Skip to content

Commit

Permalink
MB-53297: Fix systemHandler() crashed: send on closed channel
Browse files Browse the repository at this point in the history
- When security context changes, projector closes the request channel on which adminport server writes requests (Reader is closing the channel)
- This code change closes the channel from writer's side

Change-Id: I46580a70299a7c1a329903394c5db36ce13545e2
  • Loading branch information
dhananjay-cb committed Dec 12, 2022
1 parent 9b63d4c commit 926cdb0
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 31 deletions.
10 changes: 8 additions & 2 deletions secondary/adminport/common/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
// statistics, administering and managing cluster.
package common

import "errors"
import c "github.com/couchbase/indexing/secondary/common"
import (
"errors"

c "github.com/couchbase/indexing/secondary/common"
)

// errors codes

Expand Down Expand Up @@ -83,6 +86,9 @@ type Server interface {
// GetStatistics returns server statistics.
GetStatistics() c.Statistics

// Sets closeReqCh state to REQCH_CLOSE which enables server to close its request channel
CloseReqch()

// Stop server routine.
Stop()
}
Expand Down
97 changes: 68 additions & 29 deletions secondary/adminport/server/admin_httpd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ import (
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/security"

c "github.com/couchbase/indexing/secondary/common"

apcommon "github.com/couchbase/indexing/secondary/adminport/common"
c "github.com/couchbase/indexing/secondary/common"
)

// httpServer is a concrete type implementing adminport Server
// interface.
type httpServer struct {
mu sync.Mutex // handle concurrent updates to this object
lis net.Listener // TCP listener
mux *http.ServeMux
srv *http.Server // http server
messages map[string]apcommon.MessageMarshaller
conns map[string]net.Conn
reqch chan<- apcommon.Request // request channel back to application
mu sync.Mutex // handle concurrent updates to this object
lis net.Listener // TCP listener
mux *http.ServeMux
srv *http.Server // http server
messages map[string]apcommon.MessageMarshaller
conns map[string]net.Conn
reqch chan<- apcommon.Request // request channel back to application
reqChState ChannelState

// config params
name string // human readable name for this server
Expand All @@ -80,6 +80,7 @@ func NewHTTPServer(config c.Config, reqch chan<- apcommon.Request) (apcommon.Ser
messages: make(map[string]apcommon.MessageMarshaller),
conns: make(map[string]net.Conn),
reqch: reqch,
reqChState: REQCH_OPEN,
statsInBytes: 0.0,
statsOutBytes: 0.0,
statsMessages: make(map[string][3]uint64),
Expand Down Expand Up @@ -115,6 +116,15 @@ func NewHTTPServer(config c.Config, reqch chan<- apcommon.Request) (apcommon.Ser
return s, nil
}

// states for admin-server's request channel
type ChannelState byte

const (
REQCH_OPEN ChannelState = iota // 0
REQCH_CLOSE
REQCH_CLOSE_DONE
)

func validateAuth(w http.ResponseWriter, r *http.Request) bool {
_, valid, err := c.IsAuthValid(r)
if err != nil {
Expand Down Expand Up @@ -250,11 +260,18 @@ func (s *httpServer) shutdown() {
for _, conn := range s.conns {
conn.Close()
}
close(s.reqch)
s.lis = nil
}
}

func (s *httpServer) CloseReqch() {
s.mu.Lock()
defer s.mu.Unlock()
if s.reqChState == REQCH_OPEN {
s.reqChState = REQCH_CLOSE
}
}

// handle incoming request.
func (s *httpServer) systemHandler(w http.ResponseWriter, r *http.Request) {

Expand Down Expand Up @@ -316,26 +333,48 @@ func (s *httpServer) systemHandler(w http.ResponseWriter, r *http.Request) {
return
}

waitch := make(chan interface{}, 1)
// send and wait
s.reqch <- &httpAdminRequest{srv: s, msg: msg, waitch: waitch}
val := <-waitch

switch v := (val).(type) {
case apcommon.MessageMarshaller:
if dataOut, err = v.Encode(); err == nil {
header := w.Header()
header["Content-Type"] = []string{v.ContentType()}
w.Write(dataOut)

} else {
err = fmt.Errorf("%v, %v", apcommon.ErrorEncodeResponse, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
s.mu.Lock()
currReqchState := s.reqChState
s.mu.Unlock()

if currReqchState == REQCH_CLOSE {
func() {
s.mu.Lock()
defer s.mu.Unlock()

case error:
err = fmt.Errorf("%v, %v", apcommon.ErrorInternal, v)
http.Error(w, v.Error(), http.StatusInternalServerError)
close(s.reqch)
logging.Infof("%s request channel closed. Server restarting.\n", s.logPrefix)
s.reqChState = REQCH_CLOSE_DONE
}()
err = fmt.Errorf("%v %v", apcommon.ErrorInternal, "server restarting")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else if currReqchState == REQCH_CLOSE_DONE {
err = fmt.Errorf("%v %v", apcommon.ErrorInternal, "server restarting")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
waitch := make(chan interface{}, 1)
// send and wait
s.reqch <- &httpAdminRequest{srv: s, msg: msg, waitch: waitch}
val := <-waitch

switch v := (val).(type) {
case apcommon.MessageMarshaller:
if dataOut, err = v.Encode(); err == nil {
header := w.Header()
header["Content-Type"] = []string{v.ContentType()}
w.Write(dataOut)

} else {
err = fmt.Errorf("%v, %v", apcommon.ErrorEncodeResponse, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}

case error:
err = fmt.Errorf("%v, %v", apcommon.ErrorInternal, v)
http.Error(w, v.Error(), http.StatusInternalServerError)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions secondary/projector/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ func (p *Projector) initSecurityContext(encryptLocalHost bool) error {
return err
}

p.admind.CloseReqch()
// restart HTTPS server
p.admind.Stop()
time.Sleep(500 * time.Millisecond)
Expand Down

0 comments on commit 926cdb0

Please sign in to comment.