Skip to content

Commit

Permalink
response combiner register added to the proxy register
Browse files Browse the repository at this point in the history
  • Loading branch information
kpacha committed Apr 8, 2018
1 parent 58cac1c commit 8b93eae
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 19 deletions.
22 changes: 8 additions & 14 deletions proxy/merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proxy

import (
"context"
"sync"
"time"

"github.com/devopsfaith/krakend/config"
Expand Down Expand Up @@ -84,36 +83,31 @@ type ResponseCombiner func(context.Context, int, []*Response) *Response

// RegisterResponseCombiner adds a new response combiner into the internal register
func RegisterResponseCombiner(name string, f ResponseCombiner) {
responseCombinersMutex.Lock()
responseCombiners[name] = f
responseCombinersMutex.Unlock()
responseCombiners.SetResponseCombiner(name, f)
}

const (
mergeKey = "combiner"
defaultCombinerName = "default"
)

var (
responseCombinersMutex = &sync.RWMutex{}
responseCombiners = map[string]ResponseCombiner{
defaultCombinerName: combineData,
}
)
var responseCombiners = initResponseCombiners()

func initResponseCombiners() *combinerRegister {
return newCombinerRegister(map[string]ResponseCombiner{defaultCombinerName: combineData}, combineData)
}

func getResponseCombiner(extra config.ExtraConfig) ResponseCombiner {
responseCombinersMutex.RLock()
combiner := responseCombiners[defaultCombinerName]
combiner, _ := responseCombiners.GetResponseCombiner(defaultCombinerName)
if v, ok := extra[Namespace]; ok {
if e, ok := v.(map[string]interface{}); ok {
if v, ok := e[mergeKey]; ok {
if c, ok := responseCombiners[v.(string)]; ok {
if c, ok := responseCombiners.GetResponseCombiner(v.(string)); ok {
combiner = c
}
}
}
}
responseCombinersMutex.RUnlock()
return combiner
}

Expand Down
10 changes: 5 additions & 5 deletions proxy/merging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@ func TestNewMergeDataMiddleware_noBackends(t *testing.T) {
}
func TestRegisterResponseCombiner(t *testing.T) {
subject := "test combiner"
if len(responseCombiners) != 1 {
t.Error("unexpected initial size of the response combiner list:", responseCombiners)
if len(responseCombiners.data.Clone()) != 1 {
t.Error("unexpected initial size of the response combiner list:", responseCombiners.data.Clone())
}
RegisterResponseCombiner(subject, getResponseCombiner(config.ExtraConfig{}))
defer delete(responseCombiners, subject)
defer func() { responseCombiners = initResponseCombiners() }()

if len(responseCombiners) != 2 {
t.Error("unexpected size of the response combiner list:", responseCombiners)
if len(responseCombiners.data.Clone()) != 2 {
t.Error("unexpected size of the response combiner list:", responseCombiners.data.Clone())
}
timeout := 500
backend := config.Backend{}
Expand Down
48 changes: 48 additions & 0 deletions proxy/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package proxy

import (
"github.com/devopsfaith/krakend/register"
)

type ResponseCombinerRegister interface {
GetResponseCombiner(string) (ResponseCombiner, bool)
SetResponseCombiner(string, ResponseCombiner)
}

func NewRegister() *Register {
return &Register{
responseCombiners,
}
}

type Register struct {
*combinerRegister
}

type combinerRegister struct {
data register.Untyped
fallback ResponseCombiner
}

func newCombinerRegister(data map[string]ResponseCombiner, fallback ResponseCombiner) *combinerRegister {
r := register.NewUntyped()
for k, v := range data {
r.Register(k, v)
}
return &combinerRegister{r, fallback}
}

func (r *combinerRegister) GetResponseCombiner(name string) (ResponseCombiner, bool) {
v, ok := r.data.Get(name)
if !ok {
return r.fallback, ok
}
if rc, ok := v.(ResponseCombiner); ok {
return rc, ok
}
return r.fallback, ok
}

func (r *combinerRegister) SetResponseCombiner(name string, rc ResponseCombiner) {
r.data.Register(name, rc)
}
79 changes: 79 additions & 0 deletions proxy/register_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package proxy

import (
"context"
"testing"
)

func TestNewRegister_responseCombiner_ok(t *testing.T) {
r := NewRegister()
r.SetResponseCombiner("name1", func(_ context.Context, total int, parts []*Response) *Response {
if total < 0 || total >= len(parts) {
return nil
}
return parts[total]
})

rc, ok := r.GetResponseCombiner("name1")
if !ok {
t.Error("expecting response combiner")
return
}

result := rc(context.Background(), 0, []*Response{{IsComplete: true, Data: map[string]interface{}{"a": 42}}})

if result == nil {
t.Error("expecting result")
return
}

if !result.IsComplete {
t.Error("expecting a complete result")
return
}

if len(result.Data) != 1 {
t.Error("unexpected result size:", len(result.Data))
return
}
}

func TestNewRegister_responseCombiner_fallbackIfErrored(t *testing.T) {
r := NewRegister()

r.data.Register("errored", true)

rc, ok := r.GetResponseCombiner("errored")
if !ok {
t.Error("expecting response combiner")
return
}

original := &Response{IsComplete: true, Data: map[string]interface{}{"a": 42}}

result := rc(context.Background(), 0, []*Response{original})

if result != original {
t.Error("unexpected result:", result)
return
}
}

func TestNewRegister_responseCombiner_fallbackIfUnknown(t *testing.T) {
r := NewRegister()

rc, ok := r.GetResponseCombiner("unkown")
if ok {
t.Error("the response combiner should not be found")
return
}

original := &Response{IsComplete: true, Data: map[string]interface{}{"a": 42}}

result := rc(context.Background(), 0, []*Response{original})

if result != original {
t.Error("unexpected result:", result)
return
}
}

0 comments on commit 8b93eae

Please sign in to comment.