From 1c01ea02cb0289d72e2938107afce2bb27d64de3 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 29 Dec 2020 10:04:11 +0800 Subject: [PATCH 1/7] init --- pulsar/internal/service_name_resolver.go | 3 +++ pulsar/internal/service_uri.go | 3 +++ 2 files changed, 6 insertions(+) create mode 100644 pulsar/internal/service_name_resolver.go create mode 100644 pulsar/internal/service_uri.go diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go new file mode 100644 index 0000000000..218c0e0b2d --- /dev/null +++ b/pulsar/internal/service_name_resolver.go @@ -0,0 +1,3 @@ +package internal + +// Map: PulsarServiceNameResolver.java diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go new file mode 100644 index 0000000000..d94322ede7 --- /dev/null +++ b/pulsar/internal/service_uri.go @@ -0,0 +1,3 @@ +package internal + +// map to ServiceURI.java From a887edd76f327a4566a5ebeafa55161db21825d2 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 5 Mar 2021 13:46:50 +0800 Subject: [PATCH 2/7] implement multiple hosts & unit tests --- go.sum | 1 + pulsar/client_impl.go | 6 +- pulsar/internal/lookup_service.go | 11 +- pulsar/internal/lookup_service_test.go | 55 ++++- pulsar/internal/rpc_client.go | 22 +- pulsar/internal/service_name_resolver.go | 115 ++++++++++ pulsar/internal/service_name_resolver_test.go | 130 +++++++++++ pulsar/internal/service_uri.go | 207 ++++++++++++++++++ pulsar/internal/service_uri_test.go | 156 +++++++++++++ pulsar/producer_test.go | 2 +- 10 files changed, 682 insertions(+), 23 deletions(-) create mode 100644 pulsar/internal/service_name_resolver_test.go create mode 100644 pulsar/internal/service_uri_test.go diff --git a/go.sum b/go.sum index 7f61837d9f..a14857b2d2 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,7 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 5882dd4ff4..ee8ff50a24 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -123,8 +123,10 @@ func newClient(options ClientOptions) (Client, error) { log: logger, metrics: metrics, } - c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics) - c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics) + serviceNameResolver := internal.NewPulsarServiceNameResolver(url) + + c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics) + c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics) c.handlers = internal.NewClientHandlers() return c, nil diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index 65fa383737..313a9f5026 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -46,18 +46,18 @@ type LookupService interface { type lookupService struct { rpcClient RPCClient - serviceURL *url.URL + serviceNameResolver ServiceNameResolver tlsEnabled bool log log.Logger metrics *Metrics } // NewLookupService init a lookup service struct and return an object of LookupService. -func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, +func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService { return &lookupService{ rpcClient: rpcClient, - serviceURL: serviceURL, + serviceNameResolver: serviceNameResolver, tlsEnabled: tlsEnabled, log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), metrics: metrics, @@ -78,7 +78,10 @@ func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (lo var physicalAddr *url.URL if lr.GetProxyThroughServiceUrl() { - physicalAddr = ls.serviceURL + physicalAddr, err = ls.serviceNameResolver.ResolveHost() + if err != nil { + return nil, nil, err + } } else { physicalAddr = logicalAddress } diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index ad0e1bd6d3..a3f9bfe150 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -112,6 +112,7 @@ func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT func TestLookupSuccess(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedLookupRPCClient{ t: t, @@ -131,7 +132,7 @@ func TestLookupSuccess(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -144,6 +145,7 @@ func TestLookupSuccess(t *testing.T) { func TestTlsLookupSuccess(t *testing.T) { url, err := url.Parse("pulsar+ssl://example:6651") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedLookupRPCClient{ t: t, @@ -163,7 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) { BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), }, }, - }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -176,6 +178,7 @@ func TestTlsLookupSuccess(t *testing.T) { func TestLookupWithProxy(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedLookupRPCClient{ t: t, @@ -196,7 +199,7 @@ func TestLookupWithProxy(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(true), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -229,7 +232,7 @@ func TestTlsLookupWithProxy(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(true), }, }, - }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -273,7 +276,7 @@ func TestLookupWithRedirect(t *testing.T) { BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -317,7 +320,7 @@ func TestTlsLookupWithRedirect(t *testing.T) { BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"), }, }, - }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.NoError(t, err) @@ -350,7 +353,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) { ProxyThroughServiceUrl: proto.Bool(false), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.Error(t, err) @@ -378,7 +381,7 @@ func TestLookupWithLookupFailure(t *testing.T) { Authoritative: proto.Bool(true), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) lr, err := ls.Lookup("my-topic") assert.Error(t, err) @@ -447,6 +450,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, re func TestGetPartitionedTopicMetadataSuccess(t *testing.T) { url, err := url.Parse("pulsar://example:6650") assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{ t: t, @@ -464,10 +468,43 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) { Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(), }, }, - }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) metadata, err := ls.GetPartitionedTopicMetadata("my-topic") assert.NoError(t, err) assert.NotNil(t, metadata) assert.Equal(t, metadata.GetPartitions(), uint32(1)) } + +func TestLookupSuccessWithMultipleHosts(t *testing.T) { + url, err := url.Parse("pulsar://host1,host2,host3:6650") + assert.NoError(t, err) + serviceNameResolver := NewPulsarServiceNameResolver(url) + + ls := NewLookupService(&mockedLookupRPCClient{ + t: t, + + expectedRequests: []pb.CommandLookupTopic{ + { + RequestId: proto.Uint64(1), + Topic: proto.String("my-topic"), + Authoritative: proto.Bool(false), + }, + }, + mockedResponses: []pb.CommandLookupTopicResponse{ + { + RequestId: proto.Uint64(1), + Response: responseType(pb.CommandLookupTopicResponse_Connect), + Authoritative: proto.Bool(true), + BrokerServiceUrl: proto.String("pulsar://broker-1:6650"), + }, + }, + }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{})) + + lr, err := ls.Lookup("my-topic") + assert.NoError(t, err) + assert.NotNil(t, lr) + + assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String()) + assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String()) +} diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 0bb482c153..f8372c4654 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -56,7 +56,7 @@ type RPCClient interface { } type rpcClient struct { - serviceURL *url.URL + serviceNameResolver ServiceNameResolver pool ConnectionPool requestTimeout time.Duration requestIDGenerator uint64 @@ -66,10 +66,10 @@ type rpcClient struct { metrics *Metrics } -func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, +func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool, requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient { return &rpcClient{ - serviceURL: serviceURL, + serviceNameResolver: serviceNameResolver, pool: pool, requestTimeout: requestTimeout, log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), @@ -79,8 +79,12 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { - - rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message) + host, err := c.serviceNameResolver.ResolveHost() + if err != nil { + c.log.Errorf("request host resolve failed with error: {%v}", err) + return nil, err + } + rpcResult, err := c.Request(host, host, requestID, cmdType, message) if _, ok := err.(net.Error); ok { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. @@ -92,8 +96,12 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ retryTime = backoff.Next() c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) - - rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message) + host, err := c.serviceNameResolver.ResolveHost() + if err != nil { + c.log.Errorf("Retrying request host resolve failed with error: {%v}", err) + continue + } + rpcResult, err = c.Request(host, host, requestID, cmdType, message) if _, ok := err.(net.Error); ok { continue } else { diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go index 218c0e0b2d..b67f5197bd 100644 --- a/pulsar/internal/service_name_resolver.go +++ b/pulsar/internal/service_name_resolver.go @@ -1,3 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package internal +import ( + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "math/rand" + "net/url" + "sync/atomic" + "time" +) + // Map: PulsarServiceNameResolver.java + +type ServiceNameResolver interface { + ResolveHost() (*url.URL, error) + ResolveHostUri() (*PulsarServiceURI, error) + UpdateServiceUrl(url *url.URL) error + GetServiceURI() *PulsarServiceURI + GetServiceURL() *url.URL + GetAddressList() []*url.URL +} + +type PulsarServiceNameResolver struct { + ServiceURI *PulsarServiceURI + ServiceURL *url.URL + CurrentIndex int32 + AddressList []*url.URL +} + +func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { + r := &PulsarServiceNameResolver{} + err := r.UpdateServiceUrl(url) + if err != nil { + log.Errorf("create pulsar service name resolver failed : %v", err) + } + return r +} + +func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) { + if r.AddressList == nil { + return nil, errors.New("no service url is provided yet") + } + if len(r.AddressList) == 0 { + return nil, errors.New(fmt.Sprintf("no hosts found for service url : %v", r.ServiceURL)) + } + if len(r.AddressList) == 1 { + return r.AddressList[0], nil + } else { + idx := (r.CurrentIndex + 1) % int32(len(r.AddressList)) + atomic.StoreInt32(&r.CurrentIndex, idx) + return r.AddressList[idx], nil + } +} + +func (r *PulsarServiceNameResolver) ResolveHostUri() (*PulsarServiceURI, error) { + host, err := r.ResolveHost() + if err != nil { + return nil, err + } + hostUrl := host.Scheme + "://" + host.Hostname() + ":" + host.Port() + return NewPulsarServiceURIFromUriString(hostUrl) +} + +func (r *PulsarServiceNameResolver) UpdateServiceUrl(u *url.URL) error { + uri, err := NewPulsarServiceURIFromUrl(u) + if err != nil { + log.Errorf("invalid service-url instance %s provided %v", u, err) + return err + } + + hosts := uri.ServiceHosts + var addresses []*url.URL + for _, host := range hosts { + hostUrl := uri.URL.Scheme + "://" + host + u, err := url.Parse(hostUrl) + if err != nil { + log.Errorf("invalid host-url %s provided %v", hostUrl, err) + return err + } + addresses = append(addresses, u) + } + r.AddressList = addresses + r.ServiceURL = u + r.ServiceURI = uri + rand.Seed(time.Now().Unix()) // initialize global pseudo random generator + atomic.StoreInt32(&r.CurrentIndex, int32(rand.Intn(len(addresses)))) + return nil +} + +func (r *PulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI { + return r.ServiceURI +} + +func (r *PulsarServiceNameResolver) GetServiceURL() *url.URL { + return r.ServiceURL +} + +func (r *PulsarServiceNameResolver) GetAddressList() []*url.URL { + return r.AddressList +} diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go new file mode 100644 index 0000000000..ba6b96f2b2 --- /dev/null +++ b/pulsar/internal/service_name_resolver_test.go @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "github.com/stretchr/testify/assert" + "net/url" + "testing" +) + +func TestResolveBeforeUpdateServiceUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + u, err := resolver.ResolveHost() + assert.Nil(t, u) + assert.NotNil(t, err) + assert.EqualError(t, err, "no service url is provided yet") +} + +func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + u, err := resolver.ResolveHostUri() + assert.Nil(t, u) + assert.NotNil(t, err) + assert.EqualError(t, err, "no service url is provided yet") +} + +func TestUpdateInvalidServiceUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + url, _ := url.Parse("pulsar:///") + err := resolver.UpdateServiceUrl(url) + assert.NotNil(t, err) + assert.Empty(t, resolver.GetServiceURL()) + assert.Nil(t, resolver.GetServiceURI()) +} + +func TestSimpleHostUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + serviceUrl, _ := url.Parse("pulsar://host1:6650") + err := resolver.UpdateServiceUrl(serviceUrl) + assert.Nil(t, err) + assert.Equal(t, serviceUrl, resolver.GetServiceURL()) + expectedUri, err := NewPulsarServiceURIFromUrl(serviceUrl) + assert.Nil(t, err) + assert.Equal(t, expectedUri, resolver.GetServiceURI()) + actualHost, err := resolver.ResolveHost() + assert.Nil(t, err) + assert.Equal(t, "host1", actualHost.Hostname()) + assert.Equal(t, "6650", actualHost.Port()) + + newServiceUrl, _ := url.Parse("pulsar://host2:6650") + err = resolver.UpdateServiceUrl(newServiceUrl) + assert.Nil(t, err) + assert.Equal(t, newServiceUrl, resolver.GetServiceURL()) + expectedUri, err = NewPulsarServiceURIFromUrl(newServiceUrl) + assert.Nil(t, err) + assert.Equal(t, expectedUri, resolver.GetServiceURI()) + actualHost, err = resolver.ResolveHost() + assert.Nil(t, err) + assert.Equal(t, "host2", actualHost.Hostname()) + assert.Equal(t, "6650", actualHost.Port()) +} + +func TestMultipleHostsUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + serviceUrl, _ := url.Parse("pulsar://host1:6650,host2:6650") + err := resolver.UpdateServiceUrl(serviceUrl) + assert.Nil(t, err) + assert.Equal(t, serviceUrl, resolver.GetServiceURL()) + expectedUri, err := NewPulsarServiceURIFromUrl(serviceUrl) + assert.Nil(t, err) + assert.Equal(t, expectedUri, resolver.GetServiceURI()) + host1, _ := url.Parse("pulsar://host1:6650") + host2, _ := url.Parse("pulsar://host2:6650") + host1uri, _ := NewPulsarServiceURIFromUriString("pulsar://host1:6650") + host2uri, _ := NewPulsarServiceURIFromUriString("pulsar://host2:6650") + assert.Contains(t, resolver.GetAddressList(), host1) + assert.Contains(t, resolver.GetAddressList(), host2) + hosts := []*url.URL{host1, host2} + hosturis := []*PulsarServiceURI{host1uri, host2uri} + for i := 0; i<10; i++{ + host, err := resolver.ResolveHost() + assert.Nil(t, err) + hosturi, err := resolver.ResolveHostUri() + assert.Nil(t, err) + assert.Contains(t, hosts, host) + assert.Contains(t, hosturis, hosturi) + } +} + +func TestMultipleHostsTlsUrl(t *testing.T) { + resolver := NewPulsarServiceNameResolver(nil) + serviceUrl, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651") + err := resolver.UpdateServiceUrl(serviceUrl) + assert.Nil(t, err) + assert.Equal(t, serviceUrl, resolver.GetServiceURL()) + expectedUri, err := NewPulsarServiceURIFromUrl(serviceUrl) + assert.Nil(t, err) + assert.Equal(t, expectedUri, resolver.GetServiceURI()) + host1, _ := url.Parse("pulsar+ssl://host1:6651") + host2, _ := url.Parse("pulsar+ssl://host2:6651") + host1uri, _ := NewPulsarServiceURIFromUriString("pulsar+ssl://host1:6651") + host2uri, _ := NewPulsarServiceURIFromUriString("pulsar+ssl://host2:6651") + assert.Contains(t, resolver.GetAddressList(), host1) + assert.Contains(t, resolver.GetAddressList(), host2) + hosts := []*url.URL{host1, host2} + hosturis := []*PulsarServiceURI{host1uri, host2uri} + for i := 0; i<10; i++{ + host, err := resolver.ResolveHost() + assert.Nil(t, err) + hosturi, err := resolver.ResolveHostUri() + assert.Nil(t, err) + assert.Contains(t, hosts, host) + assert.Contains(t, hosturis, hosturi) + } +} \ No newline at end of file diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go index d94322ede7..08046ad9fd 100644 --- a/pulsar/internal/service_uri.go +++ b/pulsar/internal/service_uri.go @@ -1,3 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package internal +import ( + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "net" + "net/url" + "strings" +) + +const ( + BinaryService = "pulsar" + HttpService = "http" + HttpsService = "https" + SSLService = "ssl" + BinaryPort = 6650 + BinaryTLSPort = 6651 + HttpPort = 80 + HttpsPort = 443 +) // map to ServiceURI.java +type PulsarServiceURI struct { + ServiceName string + ServiceInfos []string + ServiceHosts []string + servicePath string + URL *url.URL +} + +func NewPulsarServiceURIFromUriString(uri string) (*PulsarServiceURI, error) { + u, err := fromString(uri) + if err != nil { + log.Error(err) + return nil, err + } + return u, nil +} + +func NewPulsarServiceURIFromUrl(url *url.URL) (*PulsarServiceURI, error) { + u, err := fromURL(url) + if err != nil { + log.Error(err) + return nil, err + } + return u, nil +} + +func fromString(uriStr string) (*PulsarServiceURI, error) { + if uriStr == "" || len(uriStr) == 0 { + return nil, errors.New("service uriStr string is null") + } + if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") { + // deal with ipv6 address + hosts := strings.FieldsFunc(uriStr, splitUri) + if len(hosts) > 1 { + // deal with ipv6 address + firstHost := hosts[0] + lastHost := hosts[len(hosts)-1] + hasPath := strings.Contains(lastHost, "/") + path := "" + if hasPath { + idx := strings.Index(lastHost, "/") + path = lastHost[idx:] + } + firstHost += path + url, err := url.Parse(firstHost) + if err != nil { + return nil, err + } + serviceURI, err := fromURL(url) + if err != nil { + return nil, err + } + var mHosts []string + var multiHosts []string + mHosts = append(mHosts, serviceURI.ServiceHosts[0]) + mHosts = append(mHosts, hosts[1:]...) + + for _, v := range mHosts { + h, err := validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v) + if err == nil { + multiHosts = append(multiHosts, h) + } else { + return nil, err + } + } + + return &PulsarServiceURI{ + serviceURI.ServiceName, + serviceURI.ServiceInfos, + multiHosts, + serviceURI.servicePath, + serviceURI.URL, + }, nil + } + } + + url, err := url.Parse(uriStr) + if err != nil { + return nil, err + } + + return fromURL(url) +} + +func fromURL(url *url.URL) (*PulsarServiceURI, error) { + if url == nil { + return nil, errors.New("service url instance is null") + } + + if url.Host == "" || len(url.Host) == 0 { + return nil, errors.New("service host is null") + } + + var serviceName string + var serviceInfos []string + scheme := url.Scheme + if scheme != "" { + scheme = strings.ToLower(scheme) + schemeParts := strings.Split(scheme, "+") + serviceName = schemeParts[0] + serviceInfos = schemeParts[1:] + } + + var serviceHosts []string + hosts := strings.FieldsFunc(url.Host, splitUri) + for _, v := range hosts { + h, err := validateHostName(serviceName, serviceInfos, v) + if err == nil { + serviceHosts = append(serviceHosts, h) + } else { + return nil, err + } + } + + return &PulsarServiceURI{ + serviceName, + serviceInfos, + serviceHosts, + url.Path, + url, + }, nil +} + +func splitUri(r rune) bool { + return r == ',' || r == ';' +} + +func validateHostName(serviceName string, serviceInfos []string, hostname string) (string, error) { + uri, err := url.Parse("dummyscheme://" + hostname) + if err != nil { + return "", err + } + host := uri.Hostname() + if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") { + host = fmt.Sprintf("[%s]", host) + } + if host == "" || uri.Scheme == "" { + return "", errors.New("Invalid hostname : " + hostname) + } + + port := uri.Port() + if uri.Port() == "" { + p := getServicePort(serviceName, serviceInfos) + if p == -1 { + return "", errors.New(fmt.Sprintf("Invalid port : %d", p)) + } + port = fmt.Sprint(p) + } + result := host + ":" + port + _, _, err = net.SplitHostPort(result) + if err != nil { + return "", err + } + return result, nil +} + +func getServicePort(serviceName string, serviceInfos []string) int { + switch strings.ToLower(serviceName) { + case BinaryService: + if len(serviceInfos) == 0 { + return BinaryPort + } else if len(serviceInfos) == 1 && strings.ToLower(serviceInfos[0]) == SSLService { + return BinaryTLSPort + } + case HttpService: + return HttpPort + case HttpsService: + return HttpsPort + } + return -1 +} \ No newline at end of file diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go new file mode 100644 index 0000000000..a54bce7085 --- /dev/null +++ b/pulsar/internal/service_uri_test.go @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestInvalidServiceUris(t *testing.T) { + uris := []string{ + "://localhost:6650", // missing scheme + "pulsar:///", // missing authority + "pulsar://localhost:6650:6651/", // invalid hostname pair + "pulsar://localhost:xyz/", // invalid port + "pulsar://localhost:-6650/", // negative port + "pulsar://fec0:0:0:ffff::1:6650", // missing brackets + } + + for _, uri := range uris { + testInvalidServiceUri(t, uri) + } +} + +func TestEmptyServiceUriString(t *testing.T) { + u, err := NewPulsarServiceURIFromUriString("") + assert.Nil(t, u) + assert.NotNil(t, err) +} + +func TestNullServiceUrlInstance(t *testing.T) { + u, err := NewPulsarServiceURIFromUrl(nil) + assert.Nil(t, u) + assert.NotNil(t, err) +} + +func TestMissingServiceName(t *testing.T) { + serviceUri := "//localhost:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "", nil, []string{"localhost:6650"}, "/path/to/namespace", "") +} + +func TestEmptyPath(t *testing.T) { + serviceUri := "pulsar://localhost:6650" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"localhost:6650"}, "", "") +} + +func TestRootPath(t *testing.T) { + serviceUri := "pulsar://localhost:6650/" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"localhost:6650"}, "/", "") +} + +func TestUserInfo(t *testing.T) { + serviceUri := "pulsar://pulsaruser@localhost:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"localhost:6650"}, "/path/to/namespace", "pulsaruser") +} + +func TestIpv6Uri(t *testing.T) { + serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "pulsaruser") +} + +func TestIpv6UriWithoutPulsarPort(t *testing.T) { + serviceUri := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "") +} + +func TestMultiIpv6Uri(t *testing.T) { + serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") +} + +func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) { + serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") +} + +func TestMultipleHostsSemiColon(t *testing.T) { + serviceUri := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") +} + +func TestMultipleHostsComma(t *testing.T) { + serviceUri := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutPulsarPorts(t *testing.T) { + serviceUri := "pulsar://host1,host2,host3/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) { + serviceUri := "pulsar+ssl://host1,host2,host3/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutHttpPorts(t *testing.T) { + serviceUri := "http://host1,host2,host3/path/to/namespace" + assertServiceUri(t, serviceUri, "http", nil, []string{"host1:80", "host2:80", "host3:80"}, "/path/to/namespace", "") +} + +func TestMultipleHostsWithoutHttpsPorts(t *testing.T) { + serviceUri := "https://host1,host2,host3/path/to/namespace" + assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", "") +} + +func TestMultipleHostsMixedPorts(t *testing.T) { + serviceUri := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") +} + +func TestMultipleHostsMixed(t *testing.T) { + serviceUri := "pulsar://host1:6640,host2,host3:6660/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") +} + +func TestUserInfoWithMultipleHosts(t *testing.T) { + serviceUri := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace" + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "pulsaruser") +} + +func testInvalidServiceUri(t *testing.T, serviceUri string) { + u, err := NewPulsarServiceURIFromUriString(serviceUri) + t.Logf("testInvalidServiceUri %s", serviceUri) + assert.Nil(t, u) + assert.NotNil(t, err) +} + +func assertServiceUri(t *testing.T, serviceUri, expectedServiceName string, + expectedServiceInfo, expectedServiceHosts []string, expectedServicePath, expectedServiceUser string) { + uri, err := NewPulsarServiceURIFromUriString(serviceUri) + assert.Nil(t, err) + assert.NotNil(t, serviceUri) + assert.Equal(t, expectedServiceName, uri.ServiceName) + assert.Equal(t, expectedServicePath, uri.servicePath) + if expectedServiceUser != "" { + assert.Equal(t, expectedServiceUser, uri.URL.User.Username()) + } + assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos) + assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts) +} \ No newline at end of file diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 5708cad581..4d62cac169 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -621,7 +621,7 @@ func TestProducerMetadata(t *testing.T) { } producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, - Name: "my-producer", + Name: "meta-data-producer", Properties: props, }) if err != nil { From bb92e82d35254761de1100101568fbe7af4bb5ac Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 5 Mar 2021 16:04:53 +0800 Subject: [PATCH 3/7] fix producer --- pulsar/client_impl_test.go | 56 ++++++++++++++++++++++++++++++ pulsar/internal/connection_pool.go | 4 +++ pulsar/internal/rpc_client.go | 7 ++-- 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 661eac2765..2023e129f8 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -18,6 +18,7 @@ package pulsar import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -458,3 +459,58 @@ func anonymousNamespacePolicy() map[string]interface{} { "replication_clusters": []string{"standalone"}, } } + +func TestRetryWithMultipleHosts(t *testing.T) { + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6600,localhost:6650", + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + + assert.Nil(t, err) + defer producer.Close() + + ctx := context.Background() + var msgIds [][]byte + + for i := 0; i < 10; i++ { + if msgId, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + assert.Nil(t, err) + } else { + assert.NotNil(t, msgId) + msgIds = append(msgIds, msgId.Serialize()) + } + } + + assert.Equal(t, 10, len(msgIds)) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "retry-multi-hosts-sub", + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer consumer.Close() + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + assert.Contains(t, msgIds, msg.ID().Serialize()) + consumer.Ack(msg) + } + + err = consumer.Unsubscribe() + assert.Nil(t, err) + +} diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 54d30b3815..29e126723d 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -103,6 +103,10 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U } if err := cnx.waitUntilReady(); err != nil { + if !wasCached { + p.pool.Delete(key) + p.log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr) + } return nil, err } return cnx, nil diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index f8372c4654..7cd5bce0eb 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -85,7 +85,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ return nil, err } rpcResult, err := c.Request(host, host, requestID, cmdType, message) - if _, ok := err.(net.Error); ok { + if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. backoff := Backoff{100 * time.Millisecond} @@ -96,13 +96,13 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ retryTime = backoff.Next() c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) - host, err := c.serviceNameResolver.ResolveHost() + host, err = c.serviceNameResolver.ResolveHost() if err != nil { c.log.Errorf("Retrying request host resolve failed with error: {%v}", err) continue } rpcResult, err = c.Request(host, host, requestID, cmdType, message) - if _, ok := err.(net.Error); ok { + if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") { continue } else { // We either succeeded or encountered a non connection error @@ -110,7 +110,6 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ } } } - return rpcResult, err } From 6eff430a0f29d6fa28ef0bbf1fe471536a885754 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 5 Mar 2021 16:06:43 +0800 Subject: [PATCH 4/7] add reader with multi hosts test --- pulsar/reader_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index cd97964a07..c64f4a8f1c 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -586,3 +586,56 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.False(t, reader.HasNext()) } + + +func TestReaderWithMultiHosts(t *testing.T) { + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6600,localhost:6650", + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + + assert.Nil(t, err) + defer reader.Close() + + i := 0 + for reader.HasNext() { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + + i++ + } + + assert.Equal(t, 10, i) +} \ No newline at end of file From 65b50c45778cb80994460d10ff973107b3b9ada8 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 5 Mar 2021 16:31:52 +0800 Subject: [PATCH 5/7] lint --- pulsar/client_impl_test.go | 6 +-- pulsar/internal/lookup_service.go | 16 +++--- pulsar/internal/rpc_client.go | 8 +-- pulsar/internal/service_name_resolver.go | 20 +++---- pulsar/internal/service_name_resolver_test.go | 9 ++-- pulsar/internal/service_uri.go | 14 ++--- pulsar/internal/service_uri_test.go | 52 ++++++++++++------- pulsar/reader_test.go | 3 +- 8 files changed, 72 insertions(+), 56 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 2023e129f8..80e177d879 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -495,9 +495,9 @@ func TestRetryWithMultipleHosts(t *testing.T) { assert.Equal(t, 10, len(msgIds)) consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "retry-multi-hosts-sub", - Type: Shared, + Topic: topic, + SubscriptionName: "retry-multi-hosts-sub", + Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) assert.Nil(t, err) diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index 313a9f5026..5b4e2ac1a1 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -45,22 +45,22 @@ type LookupService interface { } type lookupService struct { - rpcClient RPCClient + rpcClient RPCClient serviceNameResolver ServiceNameResolver - tlsEnabled bool - log log.Logger - metrics *Metrics + tlsEnabled bool + log log.Logger + metrics *Metrics } // NewLookupService init a lookup service struct and return an object of LookupService. func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService { return &lookupService{ - rpcClient: rpcClient, + rpcClient: rpcClient, serviceNameResolver: serviceNameResolver, - tlsEnabled: tlsEnabled, - log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), - metrics: metrics, + tlsEnabled: tlsEnabled, + log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), + metrics: metrics, } } diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 7cd5bce0eb..b51e4e3d91 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -70,10 +70,10 @@ func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient { return &rpcClient{ serviceNameResolver: serviceNameResolver, - pool: pool, - requestTimeout: requestTimeout, - log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), - metrics: metrics, + pool: pool, + requestTimeout: requestTimeout, + log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}), + metrics: metrics, } } diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go index b67f5197bd..a131577c9c 100644 --- a/pulsar/internal/service_name_resolver.go +++ b/pulsar/internal/service_name_resolver.go @@ -20,11 +20,12 @@ package internal import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "math/rand" "net/url" "sync/atomic" "time" + + log "github.com/sirupsen/logrus" ) // Map: PulsarServiceNameResolver.java @@ -39,10 +40,10 @@ type ServiceNameResolver interface { } type PulsarServiceNameResolver struct { - ServiceURI *PulsarServiceURI - ServiceURL *url.URL + ServiceURI *PulsarServiceURI + ServiceURL *url.URL CurrentIndex int32 - AddressList []*url.URL + AddressList []*url.URL } func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { @@ -55,19 +56,18 @@ func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { } func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) { - if r.AddressList == nil { + if r.AddressList == nil { return nil, errors.New("no service url is provided yet") } if len(r.AddressList) == 0 { - return nil, errors.New(fmt.Sprintf("no hosts found for service url : %v", r.ServiceURL)) + return nil, fmt.Errorf("no hosts found for service url : %v", r.ServiceURL) } if len(r.AddressList) == 1 { return r.AddressList[0], nil - } else { - idx := (r.CurrentIndex + 1) % int32(len(r.AddressList)) - atomic.StoreInt32(&r.CurrentIndex, idx) - return r.AddressList[idx], nil } + idx := (r.CurrentIndex + 1) % int32(len(r.AddressList)) + atomic.StoreInt32(&r.CurrentIndex, idx) + return r.AddressList[idx], nil } func (r *PulsarServiceNameResolver) ResolveHostUri() (*PulsarServiceURI, error) { diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go index ba6b96f2b2..c40c75467f 100644 --- a/pulsar/internal/service_name_resolver_test.go +++ b/pulsar/internal/service_name_resolver_test.go @@ -18,9 +18,10 @@ package internal import ( - "github.com/stretchr/testify/assert" "net/url" "testing" + + "github.com/stretchr/testify/assert" ) func TestResolveBeforeUpdateServiceUrl(t *testing.T) { @@ -92,7 +93,7 @@ func TestMultipleHostsUrl(t *testing.T) { assert.Contains(t, resolver.GetAddressList(), host2) hosts := []*url.URL{host1, host2} hosturis := []*PulsarServiceURI{host1uri, host2uri} - for i := 0; i<10; i++{ + for i := 0; i < 10; i++ { host, err := resolver.ResolveHost() assert.Nil(t, err) hosturi, err := resolver.ResolveHostUri() @@ -119,7 +120,7 @@ func TestMultipleHostsTlsUrl(t *testing.T) { assert.Contains(t, resolver.GetAddressList(), host2) hosts := []*url.URL{host1, host2} hosturis := []*PulsarServiceURI{host1uri, host2uri} - for i := 0; i<10; i++{ + for i := 0; i < 10; i++ { host, err := resolver.ResolveHost() assert.Nil(t, err) hosturi, err := resolver.ResolveHostUri() @@ -127,4 +128,4 @@ func TestMultipleHostsTlsUrl(t *testing.T) { assert.Contains(t, hosts, host) assert.Contains(t, hosturis, hosturi) } -} \ No newline at end of file +} diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go index 08046ad9fd..c4181401a3 100644 --- a/pulsar/internal/service_uri.go +++ b/pulsar/internal/service_uri.go @@ -20,10 +20,11 @@ package internal import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "net" "net/url" "strings" + + log "github.com/sirupsen/logrus" ) const ( @@ -36,13 +37,14 @@ const ( HttpPort = 80 HttpsPort = 443 ) + // map to ServiceURI.java type PulsarServiceURI struct { - ServiceName string + ServiceName string ServiceInfos []string ServiceHosts []string - servicePath string - URL *url.URL + servicePath string + URL *url.URL } func NewPulsarServiceURIFromUriString(uri string) (*PulsarServiceURI, error) { @@ -181,7 +183,7 @@ func validateHostName(serviceName string, serviceInfos []string, hostname string if uri.Port() == "" { p := getServicePort(serviceName, serviceInfos) if p == -1 { - return "", errors.New(fmt.Sprintf("Invalid port : %d", p)) + return "", fmt.Errorf("invalid port : %d", p) } port = fmt.Sprint(p) } @@ -207,4 +209,4 @@ func getServicePort(serviceName string, serviceInfos []string) int { return HttpsPort } return -1 -} \ No newline at end of file +} diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go index a54bce7085..24b97ec25d 100644 --- a/pulsar/internal/service_uri_test.go +++ b/pulsar/internal/service_uri_test.go @@ -18,18 +18,19 @@ package internal import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestInvalidServiceUris(t *testing.T) { uris := []string{ - "://localhost:6650", // missing scheme - "pulsar:///", // missing authority - "pulsar://localhost:6650:6651/", // invalid hostname pair - "pulsar://localhost:xyz/", // invalid port - "pulsar://localhost:-6650/", // negative port - "pulsar://fec0:0:0:ffff::1:6650", // missing brackets + "://localhost:6650", // missing scheme + "pulsar:///", // missing authority + "pulsar://localhost:6650:6651/", // invalid hostname pair + "pulsar://localhost:xyz/", // invalid port + "pulsar://localhost:-6650/", // negative port + "pulsar://fec0:0:0:ffff::1:6650", // missing brackets } for _, uri := range uris { @@ -71,7 +72,8 @@ func TestUserInfo(t *testing.T) { func TestIpv6Uri(t *testing.T) { serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", + "pulsaruser") } func TestIpv6UriWithoutPulsarPort(t *testing.T) { @@ -81,32 +83,40 @@ func TestIpv6UriWithoutPulsarPort(t *testing.T) { func TestMultiIpv6Uri(t *testing.T) { serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, + []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", + "pulsaruser") } func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) { serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, + []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", + "pulsaruser") } func TestMultipleHostsSemiColon(t *testing.T) { serviceUri := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") } func TestMultipleHostsComma(t *testing.T) { serviceUri := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") } func TestMultipleHostsWithoutPulsarPorts(t *testing.T) { serviceUri := "pulsar://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "") } func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) { serviceUri := "pulsar+ssl://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, + "/path/to/namespace", "") } func TestMultipleHostsWithoutHttpPorts(t *testing.T) { @@ -116,22 +126,26 @@ func TestMultipleHostsWithoutHttpPorts(t *testing.T) { func TestMultipleHostsWithoutHttpsPorts(t *testing.T) { serviceUri := "https://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", + "") } func TestMultipleHostsMixedPorts(t *testing.T) { serviceUri := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + "/path/to/namespace", "") } func TestMultipleHostsMixed(t *testing.T) { serviceUri := "pulsar://host1:6640,host2,host3:6660/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + "/path/to/namespace", "") } func TestUserInfoWithMultipleHosts(t *testing.T) { serviceUri := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "pulsaruser") + assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + "/path/to/namespace", "pulsaruser") } func testInvalidServiceUri(t *testing.T, serviceUri string) { @@ -153,4 +167,4 @@ func assertServiceUri(t *testing.T, serviceUri, expectedServiceName string, } assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos) assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts) -} \ No newline at end of file +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c64f4a8f1c..f72ba1d1a0 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -587,7 +587,6 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.False(t, reader.HasNext()) } - func TestReaderWithMultiHosts(t *testing.T) { // Multi hosts included an unreached port and the actual port for verify retry logic client, err := NewClient(ClientOptions{ @@ -638,4 +637,4 @@ func TestReaderWithMultiHosts(t *testing.T) { } assert.Equal(t, 10, i) -} \ No newline at end of file +} From 2593a049726e95c301f201d7c8ac025599e1c73c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 6 Mar 2021 10:19:33 +0800 Subject: [PATCH 6/7] style lint --- pulsar/client_impl_test.go | 12 +-- pulsar/internal/service_name_resolver.go | 24 ++--- pulsar/internal/service_name_resolver_test.go | 56 ++++++------ pulsar/internal/service_uri.go | 26 +++--- pulsar/internal/service_uri_test.go | 87 ++++++++++--------- 5 files changed, 103 insertions(+), 102 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 80e177d879..65732ea81d 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -479,20 +479,20 @@ func TestRetryWithMultipleHosts(t *testing.T) { defer producer.Close() ctx := context.Background() - var msgIds [][]byte + var msgIDs [][]byte for i := 0; i < 10; i++ { - if msgId, err := producer.Send(ctx, &ProducerMessage{ + if msgID, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }); err != nil { assert.Nil(t, err) } else { - assert.NotNil(t, msgId) - msgIds = append(msgIds, msgId.Serialize()) + assert.NotNil(t, msgID) + msgIDs = append(msgIDs, msgID.Serialize()) } } - assert.Equal(t, 10, len(msgIds)) + assert.Equal(t, 10, len(msgIDs)) consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, @@ -506,7 +506,7 @@ func TestRetryWithMultipleHosts(t *testing.T) { for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) assert.Nil(t, err) - assert.Contains(t, msgIds, msg.ID().Serialize()) + assert.Contains(t, msgIDs, msg.ID().Serialize()) consumer.Ack(msg) } diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go index a131577c9c..1943ffd206 100644 --- a/pulsar/internal/service_name_resolver.go +++ b/pulsar/internal/service_name_resolver.go @@ -32,8 +32,8 @@ import ( type ServiceNameResolver interface { ResolveHost() (*url.URL, error) - ResolveHostUri() (*PulsarServiceURI, error) - UpdateServiceUrl(url *url.URL) error + ResolveHostURI() (*PulsarServiceURI, error) + UpdateServiceURL(url *url.URL) error GetServiceURI() *PulsarServiceURI GetServiceURL() *url.URL GetAddressList() []*url.URL @@ -48,7 +48,7 @@ type PulsarServiceNameResolver struct { func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { r := &PulsarServiceNameResolver{} - err := r.UpdateServiceUrl(url) + err := r.UpdateServiceURL(url) if err != nil { log.Errorf("create pulsar service name resolver failed : %v", err) } @@ -70,29 +70,29 @@ func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) { return r.AddressList[idx], nil } -func (r *PulsarServiceNameResolver) ResolveHostUri() (*PulsarServiceURI, error) { +func (r *PulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) { host, err := r.ResolveHost() if err != nil { return nil, err } - hostUrl := host.Scheme + "://" + host.Hostname() + ":" + host.Port() - return NewPulsarServiceURIFromUriString(hostUrl) + hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port() + return NewPulsarServiceURIFromURIString(hostURL) } -func (r *PulsarServiceNameResolver) UpdateServiceUrl(u *url.URL) error { - uri, err := NewPulsarServiceURIFromUrl(u) +func (r *PulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error { + uri, err := NewPulsarServiceURIFromURL(u) if err != nil { log.Errorf("invalid service-url instance %s provided %v", u, err) return err } hosts := uri.ServiceHosts - var addresses []*url.URL + addresses := []*url.URL{} for _, host := range hosts { - hostUrl := uri.URL.Scheme + "://" + host - u, err := url.Parse(hostUrl) + hostURL := uri.URL.Scheme + "://" + host + u, err := url.Parse(hostURL) if err != nil { - log.Errorf("invalid host-url %s provided %v", hostUrl, err) + log.Errorf("invalid host-url %s provided %v", hostURL, err) return err } addresses = append(addresses, u) diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go index c40c75467f..e1906339c5 100644 --- a/pulsar/internal/service_name_resolver_test.go +++ b/pulsar/internal/service_name_resolver_test.go @@ -34,7 +34,7 @@ func TestResolveBeforeUpdateServiceUrl(t *testing.T) { func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) { resolver := NewPulsarServiceNameResolver(nil) - u, err := resolver.ResolveHostUri() + u, err := resolver.ResolveHostURI() assert.Nil(t, u) assert.NotNil(t, err) assert.EqualError(t, err, "no service url is provided yet") @@ -43,7 +43,7 @@ func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) { func TestUpdateInvalidServiceUrl(t *testing.T) { resolver := NewPulsarServiceNameResolver(nil) url, _ := url.Parse("pulsar:///") - err := resolver.UpdateServiceUrl(url) + err := resolver.UpdateServiceURL(url) assert.NotNil(t, err) assert.Empty(t, resolver.GetServiceURL()) assert.Nil(t, resolver.GetServiceURI()) @@ -51,25 +51,25 @@ func TestUpdateInvalidServiceUrl(t *testing.T) { func TestSimpleHostUrl(t *testing.T) { resolver := NewPulsarServiceNameResolver(nil) - serviceUrl, _ := url.Parse("pulsar://host1:6650") - err := resolver.UpdateServiceUrl(serviceUrl) + serviceURL, _ := url.Parse("pulsar://host1:6650") + err := resolver.UpdateServiceURL(serviceURL) assert.Nil(t, err) - assert.Equal(t, serviceUrl, resolver.GetServiceURL()) - expectedUri, err := NewPulsarServiceURIFromUrl(serviceUrl) + assert.Equal(t, serviceURL, resolver.GetServiceURL()) + expectedURI, err := NewPulsarServiceURIFromURL(serviceURL) assert.Nil(t, err) - assert.Equal(t, expectedUri, resolver.GetServiceURI()) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) actualHost, err := resolver.ResolveHost() assert.Nil(t, err) assert.Equal(t, "host1", actualHost.Hostname()) assert.Equal(t, "6650", actualHost.Port()) - newServiceUrl, _ := url.Parse("pulsar://host2:6650") - err = resolver.UpdateServiceUrl(newServiceUrl) + newServiceURL, _ := url.Parse("pulsar://host2:6650") + err = resolver.UpdateServiceURL(newServiceURL) assert.Nil(t, err) - assert.Equal(t, newServiceUrl, resolver.GetServiceURL()) - expectedUri, err = NewPulsarServiceURIFromUrl(newServiceUrl) + assert.Equal(t, newServiceURL, resolver.GetServiceURL()) + expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL) assert.Nil(t, err) - assert.Equal(t, expectedUri, resolver.GetServiceURI()) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) actualHost, err = resolver.ResolveHost() assert.Nil(t, err) assert.Equal(t, "host2", actualHost.Hostname()) @@ -78,17 +78,17 @@ func TestSimpleHostUrl(t *testing.T) { func TestMultipleHostsUrl(t *testing.T) { resolver := NewPulsarServiceNameResolver(nil) - serviceUrl, _ := url.Parse("pulsar://host1:6650,host2:6650") - err := resolver.UpdateServiceUrl(serviceUrl) + serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650") + err := resolver.UpdateServiceURL(serviceURL) assert.Nil(t, err) - assert.Equal(t, serviceUrl, resolver.GetServiceURL()) - expectedUri, err := NewPulsarServiceURIFromUrl(serviceUrl) + assert.Equal(t, serviceURL, resolver.GetServiceURL()) + expectedURI, err := NewPulsarServiceURIFromURL(serviceURL) assert.Nil(t, err) - assert.Equal(t, expectedUri, resolver.GetServiceURI()) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) host1, _ := url.Parse("pulsar://host1:6650") host2, _ := url.Parse("pulsar://host2:6650") - host1uri, _ := NewPulsarServiceURIFromUriString("pulsar://host1:6650") - host2uri, _ := NewPulsarServiceURIFromUriString("pulsar://host2:6650") + host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650") + host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650") assert.Contains(t, resolver.GetAddressList(), host1) assert.Contains(t, resolver.GetAddressList(), host2) hosts := []*url.URL{host1, host2} @@ -96,7 +96,7 @@ func TestMultipleHostsUrl(t *testing.T) { for i := 0; i < 10; i++ { host, err := resolver.ResolveHost() assert.Nil(t, err) - hosturi, err := resolver.ResolveHostUri() + hosturi, err := resolver.ResolveHostURI() assert.Nil(t, err) assert.Contains(t, hosts, host) assert.Contains(t, hosturis, hosturi) @@ -105,17 +105,17 @@ func TestMultipleHostsUrl(t *testing.T) { func TestMultipleHostsTlsUrl(t *testing.T) { resolver := NewPulsarServiceNameResolver(nil) - serviceUrl, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651") - err := resolver.UpdateServiceUrl(serviceUrl) + serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651") + err := resolver.UpdateServiceURL(serviceURL) assert.Nil(t, err) - assert.Equal(t, serviceUrl, resolver.GetServiceURL()) - expectedUri, err := NewPulsarServiceURIFromUrl(serviceUrl) + assert.Equal(t, serviceURL, resolver.GetServiceURL()) + expectedURI, err := NewPulsarServiceURIFromURL(serviceURL) assert.Nil(t, err) - assert.Equal(t, expectedUri, resolver.GetServiceURI()) + assert.Equal(t, expectedURI, resolver.GetServiceURI()) host1, _ := url.Parse("pulsar+ssl://host1:6651") host2, _ := url.Parse("pulsar+ssl://host2:6651") - host1uri, _ := NewPulsarServiceURIFromUriString("pulsar+ssl://host1:6651") - host2uri, _ := NewPulsarServiceURIFromUriString("pulsar+ssl://host2:6651") + host1uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651") + host2uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651") assert.Contains(t, resolver.GetAddressList(), host1) assert.Contains(t, resolver.GetAddressList(), host2) hosts := []*url.URL{host1, host2} @@ -123,7 +123,7 @@ func TestMultipleHostsTlsUrl(t *testing.T) { for i := 0; i < 10; i++ { host, err := resolver.ResolveHost() assert.Nil(t, err) - hosturi, err := resolver.ResolveHostUri() + hosturi, err := resolver.ResolveHostURI() assert.Nil(t, err) assert.Contains(t, hosts, host) assert.Contains(t, hosturis, hosturi) diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go index c4181401a3..bb45416fab 100644 --- a/pulsar/internal/service_uri.go +++ b/pulsar/internal/service_uri.go @@ -29,13 +29,13 @@ import ( const ( BinaryService = "pulsar" - HttpService = "http" - HttpsService = "https" + HTTPService = "http" + HTTPSService = "https" SSLService = "ssl" BinaryPort = 6650 BinaryTLSPort = 6651 - HttpPort = 80 - HttpsPort = 443 + HTTPPort = 80 + HTTPSPort = 443 ) // map to ServiceURI.java @@ -47,7 +47,7 @@ type PulsarServiceURI struct { URL *url.URL } -func NewPulsarServiceURIFromUriString(uri string) (*PulsarServiceURI, error) { +func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) { u, err := fromString(uri) if err != nil { log.Error(err) @@ -56,7 +56,7 @@ func NewPulsarServiceURIFromUriString(uri string) (*PulsarServiceURI, error) { return u, nil } -func NewPulsarServiceURIFromUrl(url *url.URL) (*PulsarServiceURI, error) { +func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) { u, err := fromURL(url) if err != nil { log.Error(err) @@ -71,7 +71,7 @@ func fromString(uriStr string) (*PulsarServiceURI, error) { } if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") { // deal with ipv6 address - hosts := strings.FieldsFunc(uriStr, splitUri) + hosts := strings.FieldsFunc(uriStr, splitURI) if len(hosts) > 1 { // deal with ipv6 address firstHost := hosts[0] @@ -143,7 +143,7 @@ func fromURL(url *url.URL) (*PulsarServiceURI, error) { } var serviceHosts []string - hosts := strings.FieldsFunc(url.Host, splitUri) + hosts := strings.FieldsFunc(url.Host, splitURI) for _, v := range hosts { h, err := validateHostName(serviceName, serviceInfos, v) if err == nil { @@ -162,7 +162,7 @@ func fromURL(url *url.URL) (*PulsarServiceURI, error) { }, nil } -func splitUri(r rune) bool { +func splitURI(r rune) bool { return r == ',' || r == ';' } @@ -203,10 +203,10 @@ func getServicePort(serviceName string, serviceInfos []string) int { } else if len(serviceInfos) == 1 && strings.ToLower(serviceInfos[0]) == SSLService { return BinaryTLSPort } - case HttpService: - return HttpPort - case HttpsService: - return HttpsPort + case HTTPService: + return HTTPPort + case HTTPSService: + return HTTPSPort } return -1 } diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go index 24b97ec25d..445b325406 100644 --- a/pulsar/internal/service_uri_test.go +++ b/pulsar/internal/service_uri_test.go @@ -34,132 +34,133 @@ func TestInvalidServiceUris(t *testing.T) { } for _, uri := range uris { - testInvalidServiceUri(t, uri) + testInvalidServiceURI(t, uri) } } func TestEmptyServiceUriString(t *testing.T) { - u, err := NewPulsarServiceURIFromUriString("") + u, err := NewPulsarServiceURIFromURIString("") assert.Nil(t, u) assert.NotNil(t, err) } func TestNullServiceUrlInstance(t *testing.T) { - u, err := NewPulsarServiceURIFromUrl(nil) + u, err := NewPulsarServiceURIFromURL(nil) assert.Nil(t, u) assert.NotNil(t, err) } func TestMissingServiceName(t *testing.T) { - serviceUri := "//localhost:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "", nil, []string{"localhost:6650"}, "/path/to/namespace", "") + serviceURI := "//localhost:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"}, "/path/to/namespace", "") } func TestEmptyPath(t *testing.T) { - serviceUri := "pulsar://localhost:6650" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"localhost:6650"}, "", "") + serviceURI := "pulsar://localhost:6650" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "", "") } func TestRootPath(t *testing.T) { - serviceUri := "pulsar://localhost:6650/" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"localhost:6650"}, "/", "") + serviceURI := "pulsar://localhost:6650/" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/", "") } func TestUserInfo(t *testing.T) { - serviceUri := "pulsar://pulsaruser@localhost:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"localhost:6650"}, "/path/to/namespace", "pulsaruser") + serviceURI := "pulsar://pulsaruser@localhost:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/path/to/namespace", "pulsaruser") } func TestIpv6Uri(t *testing.T) { - serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", + serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "pulsaruser") } func TestIpv6UriWithoutPulsarPort(t *testing.T) { - serviceUri := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "") + serviceURI := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "") } func TestMultiIpv6Uri(t *testing.T) { - serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, + serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650" + + "/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") } func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) { - serviceUri := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, + serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace", "pulsaruser") } func TestMultipleHostsSemiColon(t *testing.T) { - serviceUri := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + serviceURI := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") } func TestMultipleHostsComma(t *testing.T) { - serviceUri := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + serviceURI := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") } func TestMultipleHostsWithoutPulsarPorts(t *testing.T) { - serviceUri := "pulsar://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + serviceURI := "pulsar://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "") } func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) { - serviceUri := "pulsar+ssl://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, + serviceURI := "pulsar+ssl://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"}, "/path/to/namespace", "") } func TestMultipleHostsWithoutHttpPorts(t *testing.T) { - serviceUri := "http://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "http", nil, []string{"host1:80", "host2:80", "host3:80"}, "/path/to/namespace", "") + serviceURI := "http://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "http", nil, []string{"host1:80", "host2:80", "host3:80"}, "/path/to/namespace", "") } func TestMultipleHostsWithoutHttpsPorts(t *testing.T) { - serviceUri := "https://host1,host2,host3/path/to/namespace" - assertServiceUri(t, serviceUri, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", + serviceURI := "https://host1,host2,host3/path/to/namespace" + assertServiceURI(t, serviceURI, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace", "") } func TestMultipleHostsMixedPorts(t *testing.T) { - serviceUri := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + serviceURI := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") } func TestMultipleHostsMixed(t *testing.T) { - serviceUri := "pulsar://host1:6640,host2,host3:6660/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, + serviceURI := "pulsar://host1:6640,host2,host3:6660/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"}, "/path/to/namespace", "") } func TestUserInfoWithMultipleHosts(t *testing.T) { - serviceUri := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace" - assertServiceUri(t, serviceUri, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, + serviceURI := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace" + assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"}, "/path/to/namespace", "pulsaruser") } -func testInvalidServiceUri(t *testing.T, serviceUri string) { - u, err := NewPulsarServiceURIFromUriString(serviceUri) - t.Logf("testInvalidServiceUri %s", serviceUri) +func testInvalidServiceURI(t *testing.T, serviceURI string) { + u, err := NewPulsarServiceURIFromURIString(serviceURI) + t.Logf("testInvalidServiceURI %s", serviceURI) assert.Nil(t, u) assert.NotNil(t, err) } -func assertServiceUri(t *testing.T, serviceUri, expectedServiceName string, +func assertServiceURI(t *testing.T, serviceURI, expectedServiceName string, expectedServiceInfo, expectedServiceHosts []string, expectedServicePath, expectedServiceUser string) { - uri, err := NewPulsarServiceURIFromUriString(serviceUri) + uri, err := NewPulsarServiceURIFromURIString(serviceURI) assert.Nil(t, err) - assert.NotNil(t, serviceUri) + assert.NotNil(t, serviceURI) assert.Equal(t, expectedServiceName, uri.ServiceName) assert.Equal(t, expectedServicePath, uri.servicePath) if expectedServiceUser != "" { From 0dc9712054bdb2a1006c612dcdceed8224b7caea Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 6 Mar 2021 10:22:28 +0800 Subject: [PATCH 7/7] not export PulsarServiceNameResolver --- pulsar/internal/service_name_resolver.go | 18 ++++++++---------- pulsar/internal/service_uri.go | 1 - 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go index 1943ffd206..3b1209cc4e 100644 --- a/pulsar/internal/service_name_resolver.go +++ b/pulsar/internal/service_name_resolver.go @@ -28,8 +28,6 @@ import ( log "github.com/sirupsen/logrus" ) -// Map: PulsarServiceNameResolver.java - type ServiceNameResolver interface { ResolveHost() (*url.URL, error) ResolveHostURI() (*PulsarServiceURI, error) @@ -39,7 +37,7 @@ type ServiceNameResolver interface { GetAddressList() []*url.URL } -type PulsarServiceNameResolver struct { +type pulsarServiceNameResolver struct { ServiceURI *PulsarServiceURI ServiceURL *url.URL CurrentIndex int32 @@ -47,7 +45,7 @@ type PulsarServiceNameResolver struct { } func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { - r := &PulsarServiceNameResolver{} + r := &pulsarServiceNameResolver{} err := r.UpdateServiceURL(url) if err != nil { log.Errorf("create pulsar service name resolver failed : %v", err) @@ -55,7 +53,7 @@ func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver { return r } -func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) { +func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) { if r.AddressList == nil { return nil, errors.New("no service url is provided yet") } @@ -70,7 +68,7 @@ func (r *PulsarServiceNameResolver) ResolveHost() (*url.URL, error) { return r.AddressList[idx], nil } -func (r *PulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) { +func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) { host, err := r.ResolveHost() if err != nil { return nil, err @@ -79,7 +77,7 @@ func (r *PulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) return NewPulsarServiceURIFromURIString(hostURL) } -func (r *PulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error { +func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error { uri, err := NewPulsarServiceURIFromURL(u) if err != nil { log.Errorf("invalid service-url instance %s provided %v", u, err) @@ -105,14 +103,14 @@ func (r *PulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error { return nil } -func (r *PulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI { +func (r *pulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI { return r.ServiceURI } -func (r *PulsarServiceNameResolver) GetServiceURL() *url.URL { +func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL { return r.ServiceURL } -func (r *PulsarServiceNameResolver) GetAddressList() []*url.URL { +func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL { return r.AddressList } diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go index bb45416fab..7b7533987a 100644 --- a/pulsar/internal/service_uri.go +++ b/pulsar/internal/service_uri.go @@ -38,7 +38,6 @@ const ( HTTPSPort = 443 ) -// map to ServiceURI.java type PulsarServiceURI struct { ServiceName string ServiceInfos []string