Skip to content

Commit

Permalink
[v12] Fix improper report of status on success (#24155)
Browse files Browse the repository at this point in the history
* Fix improper report of status on success

This PR fixes an edge case where the exec status wasn't properly
returned to the clients when the remote execution was successfull.

Fixes #24106

* return a proper error instead of timeout
  • Loading branch information
tigrato committed Apr 5, 2023
1 parent 7e9f18d commit 35f9e42
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
25 changes: 16 additions & 9 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,14 +1470,18 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter,
}

streamOptions := proxy.options()
if err = executor.StreamWithContext(req.Context(), streamOptions); err != nil {
err = executor.StreamWithContext(req.Context(), streamOptions)
// send the status back to the client when forwarding mode is enabled
// sendStatus sends a payload even if the error is nil to make sure the client
// receives the status and can close the connection.
if sendErr := proxy.sendStatus(err); sendErr != nil {
f.log.WithError(sendErr).Warning("Failed to send status. Exec command was aborted by client.")
}
if err != nil {
execEvent.Code = events.ExecFailureCode
execEvent.Error, execEvent.ExitCode = exitCode(err)

f.log.WithError(err).Warning("Executor failed while streaming.")
if err := proxy.sendStatus(err); err != nil {
f.log.WithError(err).Warning("Failed to send status. Exec command was aborted by client.")
}
// do not return the error otherwise the fwd.withAuth interceptor will try to write it into a hijacked connection
return nil, nil
}
Expand Down Expand Up @@ -1607,12 +1611,15 @@ func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *htt
return nil, trace.Wrap(err)
}
streamOptions := proxy.options()
if err = executor.StreamWithContext(req.Context(), streamOptions); err != nil {
err = executor.StreamWithContext(req.Context(), streamOptions)
// send the status back to the client when forwarding mode is enabled
// sendStatus sends a payload even if the error is nil to make sure the client
// receives the status and can close the connection.
if sendErr := proxy.sendStatus(err); sendErr != nil {
f.log.WithError(sendErr).Warning("Failed to send status. Exec command was aborted by client.")
}
if err != nil {
f.log.WithError(err).Warning("Executor failed while streaming.")
// send the status back to the client when forwarding mode is enabled
if err := proxy.sendStatus(err); err != nil {
f.log.WithError(err).Warning("Failed to send status. Exec command was aborted by client.")
}
// do not return the error otherwise the fwd.withAuth interceptor will try to write it into a hijacked connection
return nil, nil
}
Expand Down
24 changes: 18 additions & 6 deletions lib/kube/proxy/websocket_client_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"

gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -251,6 +252,7 @@ func (e *wsStreamClient) Close() {
// CLOSE
func (e *wsStreamClient) stream(conn *gwebsocket.Conn, options clientremotecommand.StreamOptions) error {
errChan := make(chan error, 3)
statusReport := &atomic.Bool{}
wg := sync.WaitGroup{}
if options.Stdin != nil {
wg.Add(1)
Expand Down Expand Up @@ -310,7 +312,7 @@ func (e *wsStreamClient) stream(conn *gwebsocket.Conn, options clientremotecomma
case streamStderr:
w = options.Stderr
case streamErr:
_, err := parseError(buf[1:])
_, err := parseError(buf[1:], statusReport)
errChan <- err
// Once we receive an error from streamErr, we must stop processing.
// The server also stops the execution and closes the connection.
Expand All @@ -336,10 +338,9 @@ func (e *wsStreamClient) stream(conn *gwebsocket.Conn, options clientremotecomma
if err != nil {
// check the connection was properly closed by server, and if true ignore the error.
var websocketErr *gwebsocket.CloseError
if errors.As(err, &websocketErr) && websocketErr.Code == gwebsocket.CloseNormalClosure {
err = nil
if errors.As(err, &websocketErr) && websocketErr.Code != gwebsocket.CloseNormalClosure {
errChan <- err
}
errChan <- err
return
}
e.mu.Lock()
Expand All @@ -351,7 +352,15 @@ func (e *wsStreamClient) stream(conn *gwebsocket.Conn, options clientremotecomma

wg.Wait()
close(errChan)
// always expect an error from the errChan since it means that the connection was closed
// by the server by sending a streamErr with the error.
// If no error happened during the remote execution, the server sends a streamErr with
// status = Success field.
err := <-errChan
// only check if status was reported on success.
if err == nil && !statusReport.Load() {
return trace.ConnectionProblem(nil, "server didn't report exec status using the error websocket channel")
}
return err
}

Expand Down Expand Up @@ -510,7 +519,7 @@ func dial(rt http.RoundTripper, method string, url string) error {
// drain response body

_ = resp.Body.Close()
isStatusErr, err := parseError(responseErrorBytes)
isStatusErr, err := parseError(responseErrorBytes, nil)
if isStatusErr {
return err
}
Expand Down Expand Up @@ -542,9 +551,12 @@ func (e *wsStreamClient) RoundTrip(request *http.Request) (retResp *http.Respons
}

// parseError parses the error received from Kube API and checks if the returned error is *metav1.Status
func parseError(errorBytes []byte) (bool, error) {
func parseError(errorBytes []byte, statusReporter *atomic.Bool) (bool, error) {
if obj, _, err := statusCodecs.UniversalDecoder().Decode(errorBytes, nil, &metav1.Status{}); err == nil {
if status, ok := obj.(*metav1.Status); ok && status.Status == metav1.StatusSuccess {
if statusReporter != nil {
statusReporter.Store(true)
}
return true, nil
} else if ok {
return true, &apierrors.StatusError{ErrStatus: *status}
Expand Down

0 comments on commit 35f9e42

Please sign in to comment.