From e0436f9401eef79ccdd355431fe44021814a2834 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 5 Mar 2021 16:31:52 +0800 Subject: [PATCH] lint --- pulsar/client_impl_test.go | 6 +-- pulsar/internal/lookup_service.go | 16 +++--- pulsar/internal/lookup_service_test.go | 6 +-- pulsar/internal/rpc_client.go | 8 +-- pulsar/internal/service_name_resolver.go | 20 +++---- pulsar/internal/service_name_resolver_test.go | 9 ++-- pulsar/internal/service_uri.go | 14 ++--- pulsar/internal/service_uri_test.go | 52 ++++++++++++------- pulsar/reader_test.go | 3 +- 9 files changed, 75 insertions(+), 59 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index d3e29c7226..37d7e198cf 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -495,9 +495,9 @@ func TestRetryWithMultipleHosts(t *testing.T) { assert.Equal(t, 10, len(msgIds)) consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "retry-multi-hosts-sub", - Type: Shared, + Topic: topic, + SubscriptionName: "retry-multi-hosts-sub", + Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) assert.Nil(t, err) diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index 89a6910ad1..c688771d95 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -41,22 +41,22 @@ type LookupService interface { } type lookupService struct { - rpcClient RPCClient + rpcClient RPCClient serviceNameResolver ServiceNameResolver - tlsEnabled bool - log log.Logger - metrics *Metrics + tlsEnabled bool + log log.Logger + metrics *Metrics } // NewLookupService init a lookup service struct and return an object of LookupService. func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService { return &lookupService{ - rpcClient: rpcClient, + rpcClient: rpcClient, serviceNameResolver: serviceNameResolver, - tlsEnabled: tlsEnabled, - log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), - metrics: metrics, + tlsEnabled: tlsEnabled, + log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), + metrics: metrics, } } diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index 69651bfcda..3189b6a600 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -131,7 +131,7 @@ func TestLookupSuccess(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url, serviceNameResolver,false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -164,7 +164,7 @@ func TestTlsLookupSuccess(t *testing.T) { BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), }, }, - }, url, serviceNameResolver,true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -410,7 +410,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url, serviceNameResolver,false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 7cd5bce0eb..b51e4e3d91 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -70,10 +70,10 @@ func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient { return &rpcClient{ serviceNameResolver: serviceNameResolver, - pool: pool, - requestTimeout: requestTimeout, - log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), - metrics: metrics, + pool: pool, + requestTimeout: requestTimeout, + log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), + metrics: metrics, } } diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go index b67f5197bd..a131577c9c 100644 --- a/pulsar/internal/service_name_resolver.go +++ b/pulsar/internal/service_name_resolver.go @@ -20,11 +20,12 @@ package internal import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "math/rand" "net/url" "sync/atomic" "time" + + log "github.com/sirupsen/logrus" ) // Map: PulsarServiceNameResolver.java @@ -39,10 +40,10 @@ type ServiceNameResolver interface { } type PulsarServiceNameResolver struct { - ServiceURI *PulsarServiceURI - ServiceURL *url.URL + ServiceURI *PulsarServiceURI + ServiceURL *url.URL CurrentIndex int32 - AddressList []*url.URL + AddressList []*url.URL } func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { @@ -55,19 +56,18 @@ func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { } func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) { - if r.AddressList == nil { + if r.AddressList == nil { return nil, errors.New("no service url is provided yet") } if len(r.AddressList) == 0 { - return nil, errors.New(fmt.Sprintf("no hosts found for service url : %v", r.ServiceURL)) + return nil, fmt.Errorf("no hosts found for service url : %v", r.ServiceURL) } if len(r.AddressList) == 1 { return r.AddressList[0], nil - } else { - idx := (r.CurrentIndex + 1) % int32(len(r.AddressList)) - atomic.StoreInt32(&r.CurrentIndex, idx) - return r.AddressList[idx], nil } + idx := (r.CurrentIndex + 1) % int32(len(r.AddressList)) + atomic.StoreInt32(&r.CurrentIndex, idx) + return r.AddressList[idx], nil } func (r *PulsarServiceNameResolver) ResolveHostUri() (*PulsarServiceURI, error) { diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go index ba6b96f2b2..c40c75467f 100644 --- a/pulsar/internal/service_name_resolver_test.go +++ b/pulsar/internal/service_name_resolver_test.go @@ -18,9 +18,10 @@ package internal import ( - "github.com/stretchr/testify/assert" "net/url" "testing" + + "github.com/stretchr/testify/assert" ) func TestResolveBeforeUpdateServiceUrl(t *testing.T) { @@ -92,7 +93,7 @@ func TestMultipleHostsUrl(t *testing.T) { assert.Contains(t, resolver.GetAddressList(), host2) hosts := []*url.URL{host1, host2} hosturis := []*PulsarServiceURI{host1uri, host2uri} - for i := 0; i<10; i++{ + for i := 0; i < 10; i++ { host, err := resolver.ResolveHost() assert.Nil(t, err) hosturi, err := resolver.ResolveHostUri() @@ -119,7 +120,7 @@ func TestMultipleHostsTlsUrl(t *testing.T) { assert.Contains(t, resolver.GetAddressList(), host2) hosts := []*url.URL{host1, host2} hosturis := []*PulsarServiceURI{host1uri, host2uri} - for i := 0; i<10; i++{ + for i := 0; i < 10; i++ { host, err := resolver.ResolveHost() assert.Nil(t, err) hosturi, err := resolver.ResolveHostUri() @@ -127,4 +128,4 @@ func TestMultipleHostsTlsUrl(t *testing.T) { assert.Contains(t, hosts, host) assert.Contains(t, hosturis, hosturi) } -} \ No newline at end of file +} diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go index 08046ad9fd..c4181401a3 100644 --- a/pulsar/internal/service_uri.go +++ b/pulsar/internal/service_uri.go @@ -20,10 +20,11 @@ package internal import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "net" "net/url" "strings" + + log "github.com/sirupsen/logrus" ) const ( @@ -36,13 +37,14 @@ const ( HttpPort = 80 HttpsPort = 443 ) + // map to ServiceURI.java type PulsarServiceURI struct { - ServiceName string + ServiceName string ServiceInfos []string ServiceHosts []string - servicePath string - URL *url.URL + servicePath string + URL *url.URL } func NewPulsarServiceURIFromUriString(uri string) (*PulsarServiceURI, error) { @@ -181,7 +183,7 @@ func validateHostName(serviceName string, serviceInfos []string, hostname string if uri.Port() == "" { p := getServicePort(serviceName, serviceInfos) if p == -1 { - return "", errors.New(fmt.Sprintf("Invalid port : %d", p)) + return "", fmt.Errorf("invalid port : %d", p) } port = fmt.Sprint(p) } @@ -207,4 +209,4 @@ func getServicePort(serviceName string, serviceInfos []string) int { return HttpsPort } return -1 -} \ No newline at end of file +} diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go index a54bce7085..24b97ec25d 100644 --- a/pulsar/internal/service_uri_test.go +++ b/pulsar/internal/service_uri_test.go @@ -18,18 +18,19 @@ package internal import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestInvalidServiceUris(t *testing.T) { uris := []string{ - "://localhost:6650", // missing scheme - "pulsar:///", // missing authority - "pulsar://localhost:6650:6651/", // invalid hostname pair - "pulsar://localhost:xyz/", // invalid port - "pulsar://localhost:-6650/", // negative port - "pulsar://fec0:0:0:ffff::1:6650", // missing brackets + "://localhost:6650", // missing scheme + "pulsar:///", // missing authority + "pulsar://localhost:6650:6651/", // invalid hostname pair + "pulsar://localhost:xyz/", // invalid port + "pulsar://localhost:-6650/", // negative port + "pulsar://fec0:0:0:ffff::1:6650", // missing brackets } for _, uri := range uris { @@ -71,7 +72,8 @@ func TestUserInfo(t *testing.T) { func TestIpv6Uri(t *testing.T) { serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", + "pulsaruser") } func TestIpv6UriWithoutPulsarPort(t *testing.T) { @@ -81,32 +83,40 @@ func TestIpv6UriWithoutPulsarPort(t *testing.T) { func TestMultiIpv6Uri(t *testing.T) { serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, + []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", + "pulsaruser") } func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) { serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, + []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", + "pulsaruser") } func TestMultipleHostsSemiColon(t *testing.T) { serviceUri := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") } func TestMultipleHostsComma(t *testing.T) { serviceUri := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") } func TestMultipleHostsWithoutPulsarPorts(t *testing.T) { serviceUri := "pulsar://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") } func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) { serviceUri := "pulsar+ssl://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, + "/path/to/namespace", "") } func TestMultipleHostsWithoutHttpPorts(t *testing.T) { @@ -116,22 +126,26 @@ func TestMultipleHostsWithoutHttpPorts(t *testing.T) { func TestMultipleHostsWithoutHttpsPorts(t *testing.T) { serviceUri := "https://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", + "") } func TestMultipleHostsMixedPorts(t *testing.T) { serviceUri := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + "/path/to/namespace", "") } func TestMultipleHostsMixed(t *testing.T) { serviceUri := "pulsar://host1:6640,host2,host3:6660/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + "/path/to/namespace", "") } func TestUserInfoWithMultipleHosts(t *testing.T) { serviceUri := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "pulsaruser") } func testInvalidServiceUri(t *testing.T, serviceUri string) { @@ -153,4 +167,4 @@ func assertServiceUri(t *testing.T, serviceUri, expectedServiceName string, } assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos) assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts) -} \ No newline at end of file +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c64f4a8f1c..f72ba1d1a0 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -587,7 +587,6 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.False(t, reader.HasNext()) } - func TestReaderWithMultiHosts(t *testing.T) { // Multi hosts included an unreached port and the actual port for verify retry logic client, err := NewClient(ClientOptions{ @@ -638,4 +637,4 @@ func TestReaderWithMultiHosts(t *testing.T) { } assert.Equal(t, 10, i) -} \ No newline at end of file +}