Skip to content

Commit

Permalink
Merge pull request #146 from devopsfaith/incomplete_responses
Browse files Browse the repository at this point in the history
check the proxy response before returning an error
  • Loading branch information
kpacha committed Sep 1, 2018
2 parents f003ffd + 45557c5 commit b978521
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 24 deletions.
19 changes: 9 additions & 10 deletions router/gin/endpoint.go
Expand Up @@ -3,7 +3,6 @@ package gin
import (
"context"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -37,21 +36,16 @@ func CustomErrorEndpointHandler(configuration *config.EndpointConfig, prxy proxy
c.Header(core.KrakendHeaderName, core.KrakendHeaderValue)

response, err := prxy(requestCtx, requestGenerator(c, configuration.QueryString))
if err != nil {
abort(c, errF(err))
cancel()
return
}

select {
case <-requestCtx.Done():
abort(c, http.StatusInternalServerError)
cancel()
return
if err == nil {
err = router.ErrInternalError
}
default:
}

if response != nil {
if response != nil && len(response.Data) > 0 {
if response.IsComplete {
c.Header(router.CompleteResponseHeaderName, router.HeaderCompleteResponseValue)
if isCacheEnabled {
Expand All @@ -65,6 +59,11 @@ func CustomErrorEndpointHandler(configuration *config.EndpointConfig, prxy proxy
c.Header(k, v[0])
}
} else {
if err != nil {
abort(c, errF(err))
cancel()
return
}
c.Header(router.CompleteResponseHeaderName, router.HeaderIncompleteResponseValue)
}

Expand Down
29 changes: 26 additions & 3 deletions router/gin/endpoint_test.go
Expand Up @@ -3,7 +3,7 @@ package gin
import (
"bytes"
"context"
"fmt"
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -59,19 +59,42 @@ func TestEndpointHandler_incomplete(t *testing.T) {

func TestEndpointHandler_ko(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
return nil, fmt.Errorf("This is %s", "a dummy error")
return nil, errors.New("This is a dummy error")
}
testEndpointHandler(t, 10, p, "", "", "", http.StatusInternalServerError, false)
}

func TestEndpointHandler_cancel(t *testing.T) {
func TestEndpointHandler_incompleteAndErrored(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
return &proxy.Response{
IsComplete: false,
Data: map[string]interface{}{"foo": "bar"},
}, errors.New("This is a dummy error")
}
expectedBody := "{\"foo\":\"bar\"}"
testEndpointHandler(t, 10, p, expectedBody, "", "application/json; charset=utf-8", http.StatusOK, false)
}

func TestEndpointHandler_cancelEmpty(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
time.Sleep(100 * time.Millisecond)
return nil, nil
}
testEndpointHandler(t, 0, p, "", "", "", http.StatusInternalServerError, false)
}

func TestEndpointHandler_cancel(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
time.Sleep(100 * time.Millisecond)
return &proxy.Response{
IsComplete: false,
Data: map[string]interface{}{"foo": "bar"},
}, nil
}
expectedBody := "{\"foo\":\"bar\"}"
testEndpointHandler(t, 0, p, expectedBody, "", "application/json; charset=utf-8", http.StatusOK, false)
}

func TestEndpointHandler_noop(t *testing.T) {
testEndpointHandler(t, 10, proxy.NoopProxy, "{}", "", "application/json; charset=utf-8", http.StatusOK, false)
}
Expand Down
21 changes: 10 additions & 11 deletions router/mux/endpoint.go
Expand Up @@ -48,23 +48,16 @@ func CustomEndpointHandlerWithHTTPError(rb RequestBuilder, errF router.ToHTTPErr
requestCtx, cancel := context.WithTimeout(context.Background(), endpointTimeout)

response, err := prxy(requestCtx, rb(r, configuration.QueryString, headersToSend))
if err != nil {
w.Header().Set(router.CompleteResponseHeaderName, router.HeaderIncompleteResponseValue)
http.Error(w, err.Error(), errF(err))
cancel()
return
}

select {
case <-requestCtx.Done():
w.Header().Set(router.CompleteResponseHeaderName, router.HeaderIncompleteResponseValue)
http.Error(w, router.ErrInternalError.Error(), http.StatusInternalServerError)
cancel()
return
if err == nil {
err = router.ErrInternalError
}
default:
}

if response != nil {
if response != nil && len(response.Data) > 0 {
if response.IsComplete {
w.Header().Set(router.CompleteResponseHeaderName, router.HeaderCompleteResponseValue)
if isCacheEnabled {
Expand All @@ -78,6 +71,12 @@ func CustomEndpointHandlerWithHTTPError(rb RequestBuilder, errF router.ToHTTPErr
w.Header().Set(k, v[0])
}
} else {
if err != nil {
w.Header().Set(router.CompleteResponseHeaderName, router.HeaderIncompleteResponseValue)
http.Error(w, err.Error(), errF(err))
cancel()
return
}
w.Header().Set(router.CompleteResponseHeaderName, router.HeaderIncompleteResponseValue)
}

Expand Down
25 changes: 25 additions & 0 deletions router/mux/endpoint_test.go
Expand Up @@ -3,6 +3,7 @@ package mux
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -47,7 +48,31 @@ func TestEndpointHandler_ko(t *testing.T) {
time.Sleep(5 * time.Millisecond)
}

func TestEndpointHandler_incompleteAndErrored(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
return &proxy.Response{
IsComplete: false,
Data: map[string]interface{}{"foo": "bar"},
}, errors.New("This is a dummy error")
}
expectedBody := "{\"foo\":\"bar\"}"
testEndpointHandler(t, 10, p, "GET", expectedBody, "", "application/json", http.StatusOK, false)
time.Sleep(5 * time.Millisecond)
}

func TestEndpointHandler_cancel(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
time.Sleep(100 * time.Millisecond)
return &proxy.Response{
IsComplete: false,
Data: map[string]interface{}{"foo": "bar"},
}, nil
}
testEndpointHandler(t, 0, p, "GET", "{\"foo\":\"bar\"}", "", "application/json", http.StatusOK, false)
time.Sleep(5 * time.Millisecond)
}

func TestEndpointHandler_cancelEmpty(t *testing.T) {
p := func(_ context.Context, _ *proxy.Request) (*proxy.Response, error) {
time.Sleep(100 * time.Millisecond)
return nil, nil
Expand Down

0 comments on commit b978521

Please sign in to comment.