Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet committed Mar 5, 2021
1 parent 993f1f3 commit e0436f9
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 59 deletions.
6 changes: 3 additions & 3 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ func TestRetryWithMultipleHosts(t *testing.T) {
assert.Equal(t, 10, len(msgIds))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
Expand Down
16 changes: 8 additions & 8 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ type LookupService interface {
}

type lookupService struct {
rpcClient RPCClient
rpcClient RPCClient
serviceNameResolver ServiceNameResolver
tlsEnabled bool
log log.Logger
metrics *Metrics
tlsEnabled bool
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}

Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver,false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestTlsLookupSuccess(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, serviceNameResolver,true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver,false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient {
return &rpcClient{
serviceNameResolver: serviceNameResolver,
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}

Expand Down
20 changes: 10 additions & 10 deletions pulsar/internal/service_name_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package internal
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"math/rand"
"net/url"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
)

// Map: PulsarServiceNameResolver.java
Expand All @@ -39,10 +40,10 @@ type ServiceNameResolver interface {
}

type PulsarServiceNameResolver struct {
ServiceURI *PulsarServiceURI
ServiceURL *url.URL
ServiceURI *PulsarServiceURI
ServiceURL *url.URL
CurrentIndex int32
AddressList []*url.URL
AddressList []*url.URL
}

func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
Expand All @@ -55,19 +56,18 @@ func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
}

func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
if r.AddressList == nil {
if r.AddressList == nil {
return nil, errors.New("no service url is provided yet")
}
if len(r.AddressList) == 0 {
return nil, errors.New(fmt.Sprintf("no hosts found for service url : %v", r.ServiceURL))
return nil, fmt.Errorf("no hosts found for service url : %v", r.ServiceURL)
}
if len(r.AddressList) == 1 {
return r.AddressList[0], nil
} else {
idx := (r.CurrentIndex + 1) % int32(len(r.AddressList))
atomic.StoreInt32(&r.CurrentIndex, idx)
return r.AddressList[idx], nil
}
idx := (r.CurrentIndex + 1) % int32(len(r.AddressList))
atomic.StoreInt32(&r.CurrentIndex, idx)
return r.AddressList[idx], nil
}

func (r *PulsarServiceNameResolver) ResolveHostUri() (*PulsarServiceURI, error) {
Expand Down
9 changes: 5 additions & 4 deletions pulsar/internal/service_name_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package internal

import (
"github.com/stretchr/testify/assert"
"net/url"
"testing"

"github.com/stretchr/testify/assert"
)

func TestResolveBeforeUpdateServiceUrl(t *testing.T) {
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestMultipleHostsUrl(t *testing.T) {
assert.Contains(t, resolver.GetAddressList(), host2)
hosts := []*url.URL{host1, host2}
hosturis := []*PulsarServiceURI{host1uri, host2uri}
for i := 0; i<10; i++{
for i := 0; i < 10; i++ {
host, err := resolver.ResolveHost()
assert.Nil(t, err)
hosturi, err := resolver.ResolveHostUri()
Expand All @@ -119,12 +120,12 @@ func TestMultipleHostsTlsUrl(t *testing.T) {
assert.Contains(t, resolver.GetAddressList(), host2)
hosts := []*url.URL{host1, host2}
hosturis := []*PulsarServiceURI{host1uri, host2uri}
for i := 0; i<10; i++{
for i := 0; i < 10; i++ {
host, err := resolver.ResolveHost()
assert.Nil(t, err)
hosturi, err := resolver.ResolveHostUri()
assert.Nil(t, err)
assert.Contains(t, hosts, host)
assert.Contains(t, hosturis, hosturi)
}
}
}
14 changes: 8 additions & 6 deletions pulsar/internal/service_uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package internal
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"net"
"net/url"
"strings"

log "github.com/sirupsen/logrus"
)

const (
Expand All @@ -36,13 +37,14 @@ const (
HttpPort = 80
HttpsPort = 443
)

// map to ServiceURI.java
type PulsarServiceURI struct {
ServiceName string
ServiceName string
ServiceInfos []string
ServiceHosts []string
servicePath string
URL *url.URL
servicePath string
URL *url.URL
}

func NewPulsarServiceURIFromUriString(uri string) (*PulsarServiceURI, error) {
Expand Down Expand Up @@ -181,7 +183,7 @@ func validateHostName(serviceName string, serviceInfos []string, hostname string
if uri.Port() == "" {
p := getServicePort(serviceName, serviceInfos)
if p == -1 {
return "", errors.New(fmt.Sprintf("Invalid port : %d", p))
return "", fmt.Errorf("invalid port : %d", p)
}
port = fmt.Sprint(p)
}
Expand All @@ -207,4 +209,4 @@ func getServicePort(serviceName string, serviceInfos []string) int {
return HttpsPort
}
return -1
}
}
52 changes: 33 additions & 19 deletions pulsar/internal/service_uri_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
package internal

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestInvalidServiceUris(t *testing.T) {
uris := []string{
"://localhost:6650", // missing scheme
"pulsar:///", // missing authority
"pulsar://localhost:6650:6651/", // invalid hostname pair
"pulsar://localhost:xyz/", // invalid port
"pulsar://localhost:-6650/", // negative port
"pulsar://fec0:0:0:ffff::1:6650", // missing brackets
"://localhost:6650", // missing scheme
"pulsar:///", // missing authority
"pulsar://localhost:6650:6651/", // invalid hostname pair
"pulsar://localhost:xyz/", // invalid port
"pulsar://localhost:-6650/", // negative port
"pulsar://fec0:0:0:ffff::1:6650", // missing brackets
}

for _, uri := range uris {
Expand Down Expand Up @@ -71,7 +72,8 @@ func TestUserInfo(t *testing.T) {

func TestIpv6Uri(t *testing.T) {
serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "pulsaruser")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace",
"pulsaruser")
}

func TestIpv6UriWithoutPulsarPort(t *testing.T) {
Expand All @@ -81,32 +83,40 @@ func TestIpv6UriWithoutPulsarPort(t *testing.T) {

func TestMultiIpv6Uri(t *testing.T) {
serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser")
assertServiceUri(t, serviceUri, "pulsar", nil,
[]string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
"pulsaruser")
}

func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) {
serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser")
assertServiceUri(t, serviceUri, "pulsar", nil,
[]string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
"pulsaruser")
}

func TestMultipleHostsSemiColon(t *testing.T) {
serviceUri := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
"/path/to/namespace", "")
}

func TestMultipleHostsComma(t *testing.T) {
serviceUri := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
"/path/to/namespace", "")
}

func TestMultipleHostsWithoutPulsarPorts(t *testing.T) {
serviceUri := "pulsar://host1,host2,host3/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
"/path/to/namespace", "")
}

func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) {
serviceUri := "pulsar+ssl://host1,host2,host3/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"},
"/path/to/namespace", "")
}

func TestMultipleHostsWithoutHttpPorts(t *testing.T) {
Expand All @@ -116,22 +126,26 @@ func TestMultipleHostsWithoutHttpPorts(t *testing.T) {

func TestMultipleHostsWithoutHttpsPorts(t *testing.T) {
serviceUri := "https://host1,host2,host3/path/to/namespace"
assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace",
"")
}

func TestMultipleHostsMixedPorts(t *testing.T) {
serviceUri := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"},
"/path/to/namespace", "")
}

func TestMultipleHostsMixed(t *testing.T) {
serviceUri := "pulsar://host1:6640,host2,host3:6660/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"},
"/path/to/namespace", "")
}

func TestUserInfoWithMultipleHosts(t *testing.T) {
serviceUri := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace"
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "pulsaruser")
assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
"/path/to/namespace", "pulsaruser")
}

func testInvalidServiceUri(t *testing.T, serviceUri string) {
Expand All @@ -153,4 +167,4 @@ func assertServiceUri(t *testing.T, serviceUri, expectedServiceName string,
}
assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos)
assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts)
}
}
3 changes: 1 addition & 2 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
assert.False(t, reader.HasNext())
}


func TestReaderWithMultiHosts(t *testing.T) {
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
Expand Down Expand Up @@ -638,4 +637,4 @@ func TestReaderWithMultiHosts(t *testing.T) {
}

assert.Equal(t, 10, i)
}
}

0 comments on commit e0436f9

Please sign in to comment.