Skip to content

Commit

Permalink
Use io.Closer, allow nil Closers
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Sep 27, 2015
1 parent 2766c93 commit 0ad6191
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 28 deletions.
5 changes: 3 additions & 2 deletions examples/stringsvc3/proxying.go
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"io"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -64,12 +65,12 @@ func (mw proxymw) Uppercase(s string) (string, error) {
}

func factory(ctx context.Context, qps int) loadbalancer.Factory {
return func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) {
return func(instance string) (endpoint.Endpoint, io.Closer, error) {
var e endpoint.Endpoint
e = makeUppercaseProxy(ctx, instance)
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
return e, make(loadbalancer.Closer), nil
return e, nil, nil
}
}

Expand Down
12 changes: 5 additions & 7 deletions loadbalancer/dnssrv/publisher_internal_test.go
Expand Up @@ -2,6 +2,7 @@ package dnssrv

import (
"errors"
"io"
"net"
"sync/atomic"
"testing"
Expand All @@ -10,7 +11,6 @@ import (
"golang.org/x/net/context"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/loadbalancer"
"github.com/go-kit/kit/log"
)

Expand All @@ -19,8 +19,7 @@ func TestPublisher(t *testing.T) {
name = "foo"
ttl = time.Second
e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
c = make(chan struct{})
factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c, nil }
factory = func(string) (endpoint.Endpoint, io.Closer, error) { return e, nil, nil }
logger = log.NewNopLogger()
)

Expand All @@ -41,8 +40,7 @@ func TestBadLookup(t *testing.T) {
name = "some-name"
ttl = time.Second
e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
c = make(chan struct{})
factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c, nil }
factory = func(string) (endpoint.Endpoint, io.Closer, error) { return e, nil, nil }
logger = log.NewNopLogger()
)

Expand All @@ -64,7 +62,7 @@ func TestBadFactory(t *testing.T) {
addrs = []*net.SRV{addr}
name = "some-name"
ttl = time.Second
factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") }
factory = func(string) (endpoint.Endpoint, io.Closer, error) { return nil, nil, errors.New("kaboom") }
logger = log.NewNopLogger()
)

Expand Down Expand Up @@ -97,7 +95,7 @@ func TestRefreshNoChange(t *testing.T) {
addrs = []*net.SRV{addr}
name = "my-name"
ttl = time.Second
factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") }
factory = func(string) (endpoint.Endpoint, io.Closer, error) { return nil, nil, errors.New("kaboom") }
logger = log.NewNopLogger()
)

Expand Down
7 changes: 5 additions & 2 deletions loadbalancer/endpoint_cache.go
@@ -1,6 +1,7 @@
package loadbalancer

import (
"io"
"sync"

"github.com/go-kit/kit/endpoint"
Expand Down Expand Up @@ -37,7 +38,7 @@ func NewEndpointCache(f Factory, logger log.Logger) *EndpointCache {

type endpointCloser struct {
endpoint.Endpoint
Closer
io.Closer
}

// Replace replaces the current set of endpoints with endpoints manufactured
Expand Down Expand Up @@ -68,7 +69,9 @@ func (t *EndpointCache) Replace(instances []string) {

// Close any leftover endpoints.
for _, ec := range t.m {
close(ec.Closer)
if ec.Closer != nil {
ec.Closer.Close()
}
}

// Swap and GC.
Expand Down
13 changes: 9 additions & 4 deletions loadbalancer/endpoint_cache_test.go
@@ -1,6 +1,7 @@
package loadbalancer_test

import (
"io"
"testing"
"time"

Expand All @@ -14,10 +15,10 @@ import (
func TestEndpointCache(t *testing.T) {
var (
e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
ca = make(loadbalancer.Closer)
cb = make(loadbalancer.Closer)
c = map[string]loadbalancer.Closer{"a": ca, "b": cb}
f = func(s string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c[s], nil }
ca = make(closer)
cb = make(closer)
c = map[string]io.Closer{"a": ca, "b": cb}
f = func(s string) (endpoint.Endpoint, io.Closer, error) { return e, c[s], nil }
ec = loadbalancer.NewEndpointCache(f, log.NewNopLogger())
)

Expand Down Expand Up @@ -64,3 +65,7 @@ func TestEndpointCache(t *testing.T) {
t.Errorf("didn't close the deleted instance in time")
}
}

type closer chan struct{}

func (c closer) Close() error { close(c); return nil }
8 changes: 4 additions & 4 deletions loadbalancer/etcd/publisher_test.go
Expand Up @@ -2,13 +2,13 @@ package etcd_test

import (
"errors"
"io"
"testing"

stdetcd "github.com/coreos/go-etcd/etcd"
"golang.org/x/net/context"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/loadbalancer"
kitetcd "github.com/go-kit/kit/loadbalancer/etcd"
"github.com/go-kit/kit/log"
)
Expand All @@ -32,8 +32,8 @@ func TestPublisher(t *testing.T) {
e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
)

factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) {
return e, make(loadbalancer.Closer), nil
factory := func(string) (endpoint.Endpoint, io.Closer, error) {
return e, nil, nil
}

client := &fakeClient{
Expand All @@ -54,7 +54,7 @@ func TestPublisher(t *testing.T) {
func TestBadFactory(t *testing.T) {
logger := log.NewNopLogger()

factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) {
factory := func(string) (endpoint.Endpoint, io.Closer, error) {
return nil, nil, errors.New("kaboom")
}

Expand Down
12 changes: 6 additions & 6 deletions loadbalancer/factory.go
@@ -1,15 +1,15 @@
package loadbalancer

import "github.com/go-kit/kit/endpoint"
import (
"io"

"github.com/go-kit/kit/endpoint"
)

// Factory is a function that converts an instance string, e.g. a host:port,
// to a usable endpoint. Factories are used by load balancers to convert
// instances returned by Publishers (typically host:port strings) into
// endpoints. Users are expected to provide their own factory functions that
// assume specific transports, or can deduce transports by parsing the
// instance string.
type Factory func(instance string) (endpoint.Endpoint, Closer, error)

// Closer is returned by factory functions as a way to close a generated
// endpoint.
type Closer chan struct{}
type Factory func(instance string) (endpoint.Endpoint, io.Closer, error)
6 changes: 3 additions & 3 deletions loadbalancer/static/publisher_test.go
Expand Up @@ -2,12 +2,12 @@ package static_test

import (
"fmt"
"io"
"testing"

"golang.org/x/net/context"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/loadbalancer"
"github.com/go-kit/kit/loadbalancer/static"
"github.com/go-kit/kit/log"
)
Expand All @@ -20,9 +20,9 @@ func TestStatic(t *testing.T) {
"bar": func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
"baz": func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
}
factory = func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) {
factory = func(instance string) (endpoint.Endpoint, io.Closer, error) {
if e, ok := endpoints[instance]; ok {
return e, make(loadbalancer.Closer), nil
return e, nil, nil
}
return nil, nil, fmt.Errorf("%s: not found", instance)
}
Expand Down

0 comments on commit 0ad6191

Please sign in to comment.