Skip to content

Commit

Permalink
Merge 25ab8e4 into 95c623a
Browse files Browse the repository at this point in the history
  • Loading branch information
kpacha committed Oct 25, 2018
2 parents 95c623a + 25ab8e4 commit f6e2ac5
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 17 deletions.
109 changes: 92 additions & 17 deletions proxy/merging.go
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -24,31 +27,102 @@ func NewMergeDataMiddleware(endpointConfig *config.EndpointConfig) Middleware {
if len(next) != totalBackends {
panic(ErrNotEnoughProxies)
}
if shouldRunSequentialMerger(endpointConfig) {
patterns := make([]string, len(endpointConfig.Backend))
for i, b := range endpointConfig.Backend {
patterns[i] = b.URLPattern
}
return sequentialMerge(patterns, serviceTimeout, combiner, next...)
}
return parallelMerge(serviceTimeout, combiner, next...)
}
}

return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, serviceTimeout)
func shouldRunSequentialMerger(endpointConfig *config.EndpointConfig) bool {
if v, ok := endpointConfig.ExtraConfig[Namespace]; ok {
if e, ok := v.(map[string]interface{}); ok {
if v, ok := e[isSequentialKey]; ok {
c, ok := v.(bool)
return ok && c
}
}
}
return false
}

func parallelMerge(timeout time.Duration, rc ResponseCombiner, next ...Proxy) Proxy {
return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)

parts := make(chan *Response, totalBackends)
failed := make(chan error, totalBackends)
parts := make(chan *Response, len(next))
failed := make(chan error, len(next))

for _, n := range next {
go requestPart(localCtx, n, request, parts, failed)
for _, n := range next {
go requestPart(localCtx, n, request, parts, failed)
}

acc := newIncrementalMergeAccumulator(len(next), rc)
for i := 0; i < len(next); i++ {
select {
case err := <-failed:
acc.Merge(nil, err)
case response := <-parts:
acc.Merge(response, nil)
}
}

acc := newIncrementalMergeAccumulator(totalBackends, combiner)
for i := 0; i < totalBackends; i++ {
select {
case err := <-failed:
acc.Merge(nil, err)
case response := <-parts:
acc.Merge(response, nil)
result, err := acc.Result()
cancel()
return result, err
}
}

var reMergeKey = regexp.MustCompile(`/?.*\{resp(\d+)_(.+)\}`)

func sequentialMerge(patterns []string, timeout time.Duration, rc ResponseCombiner, next ...Proxy) Proxy {
return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)

parts := make([]*Response, len(next))
out := make(chan *Response, 1)
errCh := make(chan error, 1)

acc := newIncrementalMergeAccumulator(len(next), rc)
for i, n := range next {
if i > 0 {
for _, match := range reMergeKey.FindAllStringSubmatch(patterns[i], -1) {
if len(match) > 1 {
rNum, err := strconv.Atoi(match[1])
if err != nil || rNum >= i || parts[rNum] == nil {
continue
}
key := "resp" + match[1] + "_" + match[2]

v, ok := parts[rNum].Data[match[2]]
if !ok {
continue
}
request.Params[key] = fmt.Sprintf("%v", v)
}
}
}

result, err := acc.Result()
cancel()
return result, err
requestPart(localCtx, n, request, out, errCh)
select {
case err := <-errCh:
acc.Merge(nil, err)
break
case response := <-out:
acc.Merge(response, nil)
if !response.IsComplete {
break
}
parts[i] = response
}
}

result, err := acc.Result()
cancel()
return result, err
}
}

Expand Down Expand Up @@ -149,6 +223,7 @@ func RegisterResponseCombiner(name string, f ResponseCombiner) {

const (
mergeKey = "combiner"
isSequentialKey = "sequential"
defaultCombinerName = "default"
)

Expand Down
41 changes: 41 additions & 0 deletions proxy/merging_benchmark_test.go
Expand Up @@ -44,3 +44,44 @@ func BenchmarkNewMergeDataMiddleware(b *testing.B) {
})
}
}

func BenchmarkNewMergeDataMiddleware_sequential(b *testing.B) {
backend := config.Backend{}
backends := make([]*config.Backend, 10)
for i := range backends {
backends[i] = &backend
}

proxies := []Proxy{
dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"tupu": true}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"foo": "bar"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"foobar": false}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"qux": "false"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"data": "the quick brow fox"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"status": "ok"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"aaaa": "aaaaaaaaaaaa"}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"bbbbb": 3.14}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"cccc": map[string]interface{}{"a": 42}}, IsComplete: true}),
}

for _, totalParts := range []int{2, 3, 4, 5, 6, 7, 8, 9, 10} {
b.Run(fmt.Sprintf("with %d parts", totalParts), func(b *testing.B) {
endpoint := config.EndpointConfig{
Backend: backends[:totalParts],
Timeout: time.Duration(100) * time.Millisecond,
ExtraConfig: config.ExtraConfig{
Namespace: map[string]interface{}{
isSequentialKey: true,
},
},
}
proxy := NewMergeDataMiddleware(&endpoint)(proxies[:totalParts]...)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
proxy(context.Background(), &Request{})
}
})
}
}
168 changes: 168 additions & 0 deletions proxy/merging_test.go
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -41,6 +42,173 @@ func TestNewMergeDataMiddleware_ok(t *testing.T) {
}
}

func TestNewMergeDataMiddleware_sequential(t *testing.T) {
timeout := 500
endpoint := config.EndpointConfig{
Backend: []*config.Backend{
{URLPattern: "/"},
{URLPattern: "/aaa/{resp0_supu}"},
{URLPattern: "/aaa/{resp0_supu}?x={resp1_tupu}"},
},
Timeout: time.Duration(timeout) * time.Millisecond,
ExtraConfig: config.ExtraConfig{
Namespace: map[string]interface{}{
isSequentialKey: true,
},
},
}
mw := NewMergeDataMiddleware(&endpoint)
p := mw(
dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true}),
func(ctx context.Context, r *Request) (*Response, error) {
if r.Params["resp0_supu"] != "42" {
t.Errorf("request without the expected set of params")
}
return &Response{Data: map[string]interface{}{"tupu": "foo"}, IsComplete: true}, nil
},
func(ctx context.Context, r *Request) (*Response, error) {
if r.Params["resp0_supu"] != "42" {
t.Errorf("request without the expected set of params")
}
if r.Params["resp1_tupu"] != "foo" {
t.Errorf("request without the expected set of params")
}
return &Response{Data: map[string]interface{}{"aaaa": []int{1, 2, 3}}, IsComplete: true}, nil
},
)
mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond)
out, err := p(context.Background(), &Request{Params: map[string]string{}})
if err != nil {
t.Errorf("The middleware propagated an unexpected error: %s\n", err.Error())
}
if out == nil {
t.Errorf("The proxy returned a null result\n")
return
}
select {
case <-mustEnd:
t.Errorf("We were expecting a response but we got none\n")
default:
if len(out.Data) != 3 {
t.Errorf("We weren't expecting a partial response but we got %v!\n", out)
}
if !out.IsComplete {
t.Errorf("We were expecting a completed response but we got an incompleted one!\n")
}
}
}

func TestNewMergeDataMiddleware_sequential_unavailableParams(t *testing.T) {
timeout := 500
endpoint := config.EndpointConfig{
Backend: []*config.Backend{
{URLPattern: "/"},
{URLPattern: "/aaa/{resp2_supu}"},
{URLPattern: "/aaa/{resp0_tupu}?x={resp1_tupu}"},
},
Timeout: time.Duration(timeout) * time.Millisecond,
ExtraConfig: config.ExtraConfig{
Namespace: map[string]interface{}{
isSequentialKey: true,
},
},
}
mw := NewMergeDataMiddleware(&endpoint)
p := mw(
dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true}),
func(ctx context.Context, r *Request) (*Response, error) {
if v, ok := r.Params["resp0_supu"]; ok || v != "" {
t.Errorf("request with unexpected set of params")
}
return &Response{Data: map[string]interface{}{"tupu": "foo"}, IsComplete: true}, nil
},
func(ctx context.Context, r *Request) (*Response, error) {
if v, ok := r.Params["resp0_supu"]; ok || v != "" {
t.Errorf("request with unexpected set of params")
}
if r.Params["respo_tupu"] != "" {
t.Errorf("request without the expected set of params")
}
if r.Params["resp1_tupu"] != "foo" {
t.Errorf("request without the expected set of params")
}
return &Response{Data: map[string]interface{}{"aaaa": []int{1, 2, 3}}, IsComplete: true}, nil
},
)
mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond)
out, err := p(context.Background(), &Request{Params: map[string]string{}})
if err != nil {
t.Errorf("The middleware propagated an unexpected error: %s\n", err.Error())
}
if out == nil {
t.Errorf("The proxy returned a null result\n")
return
}
select {
case <-mustEnd:
t.Errorf("We were expecting a response but we got none\n")
default:
if len(out.Data) != 3 {
t.Errorf("We weren't expecting a partial response but we got %v!\n", out)
}
if !out.IsComplete {
t.Errorf("We were expecting a completed response but we got an incompleted one!\n")
}
}
}

func TestNewMergeDataMiddleware_sequential_erroredBackend(t *testing.T) {
timeout := 500
endpoint := config.EndpointConfig{
Backend: []*config.Backend{
{URLPattern: "/"},
{URLPattern: "/aaa/{resp0_supu}"},
{URLPattern: "/aaa/{resp0_supu}?x={resp1_tupu}"},
},
Timeout: time.Duration(timeout) * time.Millisecond,
ExtraConfig: config.ExtraConfig{
Namespace: map[string]interface{}{
isSequentialKey: true,
},
},
}
expecterErr := errors.New("wait for me")
mw := NewMergeDataMiddleware(&endpoint)
p := mw(
dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true}),
func(ctx context.Context, r *Request) (*Response, error) {
if r.Params["resp0_supu"] != "42" {
t.Errorf("request without the expected set of params")
}
return nil, expecterErr
},
func(ctx context.Context, r *Request) (*Response, error) {
return nil, nil
},
)
mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond)
out, err := p(context.Background(), &Request{Params: map[string]string{}})
if err == nil {
t.Errorf("The middleware did not propagate an error\n")
return
}
if out == nil {
t.Errorf("The proxy returned a null result\n")
return
}
select {
case <-mustEnd:
t.Errorf("We were expecting a response but we got none\n")
default:
if len(out.Data) != 1 {
t.Errorf("We weren't expecting a partial response but we got %v!\n", out)
}
if out.IsComplete {
t.Errorf("We were not expecting a completed response!\n")
}
}
}

func TestNewMergeDataMiddleware_mergeIncompleteResults(t *testing.T) {
timeout := 500
backend := config.Backend{}
Expand Down

0 comments on commit f6e2ac5

Please sign in to comment.