Skip to content

Commit

Permalink
combiner register added
Browse files Browse the repository at this point in the history
  • Loading branch information
kpacha committed Mar 20, 2018
1 parent f7f2722 commit fc26cc3
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
44 changes: 42 additions & 2 deletions proxy/merging.go
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"context"
"sync"
"time"

"github.com/devopsfaith/krakend/config"
Expand All @@ -17,6 +18,7 @@ func NewMergeDataMiddleware(endpointConfig *config.EndpointConfig) Middleware {
return EmptyMiddleware
}
serviceTimeout := time.Duration(85*endpointConfig.Timeout.Nanoseconds()/100) * time.Nanosecond
combiner := getResponseCombiner(endpointConfig.ExtraConfig)

return func(next ...Proxy) Proxy {
if len(next) != totalBackends {
Expand Down Expand Up @@ -48,7 +50,7 @@ func NewMergeDataMiddleware(endpointConfig *config.EndpointConfig) Middleware {
return &Response{Data: make(map[string]interface{}), IsComplete: false}, err
}

result := combineData(totalBackends, responses)
result := combiner(localCtx, totalBackends, responses)
cancel()
return result, err
}
Expand Down Expand Up @@ -77,7 +79,45 @@ func requestPart(ctx context.Context, next Proxy, request *Request, out chan<- *
cancel()
}

func combineData(total int, parts []*Response) *Response {
// ResponseCombiner func to merge the collected responses into a single one
type ResponseCombiner func(context.Context, int, []*Response) *Response

// RegisterResponseCombiner adds a new response combiner into the internal register
func RegisterResponseCombiner(name string, f ResponseCombiner) {
responseCombinersMutex.Lock()
responseCombiners[name] = f
responseCombinersMutex.Unlock()
}

const (
mergeKey = "combiner"
defaultCombinerName = "default"
)

var (
responseCombinersMutex = &sync.RWMutex{}
responseCombiners = map[string]ResponseCombiner{
defaultCombinerName: combineData,
}
)

func getResponseCombiner(extra config.ExtraConfig) ResponseCombiner {
responseCombinersMutex.RLock()
combiner := responseCombiners[defaultCombinerName]
if v, ok := extra[Namespace]; ok {
if e, ok := v.(map[string]interface{}); ok {
if v, ok := e[mergeKey]; ok {
if c, ok := responseCombiners[v.(string)]; ok {
combiner = c
}
}
}
}
responseCombinersMutex.RUnlock()
return combiner
}

func combineData(_ context.Context, total int, parts []*Response) *Response {
isComplete := len(parts) == total
var retResponse *Response
for _, part := range parts {
Expand Down
51 changes: 49 additions & 2 deletions proxy/merging_test.go
Expand Up @@ -66,10 +66,10 @@ func TestNewMergeDataMiddleware_mergeIncompleteResults(t *testing.T) {
t.Errorf("We were expecting a response but we got none\n")
default:
if len(out.Data) != 2 {
t.Errorf("We were expecting incomplete results merged but we got %v!\n", out)
t.Errorf("We were expecting incomplete results merged but we got %v!\n", out.Data)
}
if out.IsComplete {
t.Errorf("We were expecting an incomplete response but we got an incompleted one!\n")
t.Errorf("We were expecting an incomplete response but we got a completed one!\n")
}
}
}
Expand Down Expand Up @@ -273,3 +273,50 @@ func TestNewMergeDataMiddleware_noBackends(t *testing.T) {
endpoint := config.EndpointConfig{}
NewMergeDataMiddleware(&endpoint)
}
func TestRegisterResponseCombiner(t *testing.T) {
subject := "test combiner"
if len(responseCombiners) != 1 {
t.Error("unexpected initial size of the response combiner list:", responseCombiners)
}
RegisterResponseCombiner(subject, getResponseCombiner(config.ExtraConfig{}))
defer delete(responseCombiners, subject)

if len(responseCombiners) != 2 {
t.Error("unexpected size of the response combiner list:", responseCombiners)
}
timeout := 500
backend := config.Backend{}
endpoint := config.EndpointConfig{
Backend: []*config.Backend{&backend, &backend},
Timeout: time.Duration(timeout) * time.Millisecond,
ExtraConfig: config.ExtraConfig{
Namespace: map[string]interface{}{
mergeKey: defaultCombinerName,
},
},
}
mw := NewMergeDataMiddleware(&endpoint)
p := mw(
dummyProxy(&Response{Data: map[string]interface{}{"supu": 42}, IsComplete: true}),
dummyProxy(&Response{Data: map[string]interface{}{"tupu": true}, IsComplete: true}))
mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond)
out, err := p(context.Background(), &Request{})
if err != nil {
t.Errorf("The middleware propagated an unexpected error: %s\n", err.Error())
}
if out == nil {
t.Errorf("The proxy returned a null result\n")
return
}
select {
case <-mustEnd:
t.Errorf("We were expecting a response but we got none\n")
default:
if len(out.Data) != 2 {
t.Errorf("We weren't expecting a partial response but we got %v!\n", out)
}
if !out.IsComplete {
t.Errorf("We were expecting a completed response but we got an incompleted one!\n")
}
}
}
3 changes: 3 additions & 0 deletions proxy/proxy.go
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/devopsfaith/krakend/config"
)

// Namespace to be used in extra config
const Namespace = "github.com/devopsfaith/krakend/proxy"

// Metadata is the Metadata of the Response which contains Headers and StatusCode
type Metadata struct {
Headers map[string][]string
Expand Down

0 comments on commit fc26cc3

Please sign in to comment.