Skip to content

Commit

Permalink
incremental response merging added
Browse files Browse the repository at this point in the history
  • Loading branch information
kpacha committed Apr 13, 2018
1 parent 4893459 commit a998a9d
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 27 deletions.
89 changes: 75 additions & 14 deletions proxy/merging.go
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"context"
"strings"
"time"

"github.com/devopsfaith/krakend/config"
Expand All @@ -27,35 +28,76 @@ func NewMergeDataMiddleware(endpointConfig *config.EndpointConfig) Middleware {
return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, serviceTimeout)

parts := make(chan *Response, len(next))
failed := make(chan error, len(next))
parts := make(chan *Response, totalBackends)
failed := make(chan error, totalBackends)

for _, n := range next {
go requestPart(localCtx, n, request, parts, failed)
}

var err error
responses := make([]*Response, len(next))
isEmpty := true
for i := 0; i < len(next); i++ {
acc := newIncrementalMergeAccumulator(totalBackends, combiner)
for i := 0; i < totalBackends; i++ {
select {
case err = <-failed:
case responses[i] = <-parts:
isEmpty = false
case err := <-failed:
acc.Merge(nil, err)
case response := <-parts:
acc.Merge(response, nil)
}
}
if isEmpty {
cancel()
return &Response{Data: make(map[string]interface{}), IsComplete: false}, err
}

result := combiner(totalBackends, responses)
result, err := acc.Result()
cancel()
return result, err
}
}
}

type incrementalMergeAccumulator struct {
pending int
data *Response
combiner ResponseCombiner
errs []error
}

func newIncrementalMergeAccumulator(total int, combiner ResponseCombiner) *incrementalMergeAccumulator {
return &incrementalMergeAccumulator{
pending: total,
combiner: combiner,
errs: []error{},
}
}

func (i *incrementalMergeAccumulator) Merge(res *Response, err error) {
i.pending--
if err != nil {
i.errs = append(i.errs, err)
if i.data != nil {
i.data.IsComplete = false
}
return
}
if res == nil {
i.errs = append(i.errs, errNullResult)
return
}
if i.data == nil {
i.data = res
return
}
i.data = i.combiner(2, []*Response{i.data, res})
}

func (i *incrementalMergeAccumulator) Result() (*Response, error) {
if i.data == nil {
return &Response{Data: make(map[string]interface{}, 0), IsComplete: false}, newMergeError(i.errs)
}

if i.pending != 0 {
i.data.IsComplete = false
}
return i.data, newMergeError(i.errs)
}

func requestPart(ctx context.Context, next Proxy, request *Request, out chan<- *Response, failed chan<- error) {
localCtx, cancel := context.WithCancel(ctx)

Expand All @@ -78,6 +120,25 @@ func requestPart(ctx context.Context, next Proxy, request *Request, out chan<- *
cancel()
}

func newMergeError(errs []error) error {
if len(errs) == 0 {
return nil
}
return mergeError{errs}
}

type mergeError struct {
errs []error
}

func (m mergeError) Error() string {
msg := make([]string, len(m.errs))
for i, err := range m.errs {
msg[i] = err.Error()
}
return strings.Join(msg, "\n")
}

// ResponseCombiner func to merge the collected responses into a single one
type ResponseCombiner func(int, []*Response) *Response

Expand Down
28 changes: 19 additions & 9 deletions proxy/merging_benchmark_test.go
Expand Up @@ -11,21 +11,31 @@ import (

func BenchmarkNewMergeDataMiddleware(b *testing.B) {
backend := config.Backend{}
backends := []*config.Backend{&backend, &backend, &backend, &backend}
backends := make([]*config.Backend, 10)
for i := range backends {
backends[i] = &backend
}

partial1 := dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true})
partial2 := dummyProxy(&Response{Data: map[string]interface{}{"tupu": true}, IsComplete: true})
partial3 := dummyProxy(&Response{Data: map[string]interface{}{"foo": "bar"}, IsComplete: true})
partial4 := dummyProxy(&Response{Data: map[string]interface{}{"foobar": false}, IsComplete: true})
proxies := []Proxy{partial1, partial2, partial3, partial4}
proxies := []Proxy{
dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"tupu": true}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"foo": "bar"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"foobar": false}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"qux": "false"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"data": "the quick brow fox"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"status": "ok"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"aaaa": "aaaaaaaaaaaa"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"bbbbb": 3.14}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"cccc": map[string]interface{}{"a": 42}}, IsComplete: true}),
}

for testCase, totalParts := range []int{2, 3, 4} {
for _, totalParts := range []int{2, 3, 4, 5, 6, 7, 8, 9, 10} {
b.Run(fmt.Sprintf("with %d parts", totalParts), func(b *testing.B) {
endpoint := config.EndpointConfig{
Backend: backends[:testCase+2],
Backend: backends[:totalParts],
Timeout: time.Duration(100) * time.Millisecond,
}
proxy := NewMergeDataMiddleware(&endpoint)(proxies[:testCase+2]...)
proxy := NewMergeDataMiddleware(&endpoint)(proxies[:totalParts]...)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
Expand Down
36 changes: 32 additions & 4 deletions proxy/merging_test.go
Expand Up @@ -183,8 +183,22 @@ func TestNewMergeDataMiddleware_nullResponse(t *testing.T) {

mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond)
out, err := mw(NoopProxy, NoopProxy)(context.Background(), &Request{})
if err != errNullResult {
t.Errorf("The middleware propagated an unexpected error: %s\n", err.Error())
if err == nil {
t.Errorf("The middleware did not propagate the expected error")
}
switch mergeErr := err.(type) {
case mergeError:
if len(mergeErr.errs) != 2 {
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
if mergeErr.errs[0] != mergeErr.errs[1] {
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
if mergeErr.errs[0] != errNullResult {
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
default:
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
if out == nil {
t.Errorf("The proxy returned a null result\n")
Expand Down Expand Up @@ -216,8 +230,22 @@ func TestNewMergeDataMiddleware_timeout(t *testing.T) {
delayedProxy(t, time.Duration(5*timeout)*time.Millisecond, nil))
mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond)
out, err := p(context.Background(), &Request{})
if err == nil || err.Error() != "context deadline exceeded" {
t.Errorf("The middleware propagated an unexpected error: %s\n", err.Error())
if err == nil {
t.Errorf("The middleware did not propagate the expected error")
}
switch mergeErr := err.(type) {
case mergeError:
if len(mergeErr.errs) != 2 {
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
if mergeErr.errs[0].Error() != mergeErr.errs[1].Error() {
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
if mergeErr.errs[0].Error() != "context deadline exceeded" {
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
default:
t.Errorf("The middleware propagated an unexpected error: %s", err.Error())
}
if out == nil {
t.Errorf("The proxy returned a null result\n")
Expand Down

0 comments on commit a998a9d

Please sign in to comment.