Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 177] add multiple hosts support #484

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
Expand Down
6 changes: 4 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ func newClient(options ClientOptions) (Client, error) {
log: logger,
metrics: metrics,
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics)
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics)
c.handlers = internal.NewClientHandlers()

return c, nil
Expand Down
56 changes: 56 additions & 0 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pulsar

import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -458,3 +459,58 @@ func anonymousNamespacePolicy() map[string]interface{} {
"replication_clusters": []string{"standalone"},
}
}

func TestRetryWithMultipleHosts(t *testing.T) {
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6600,localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.Nil(t, err)
defer producer.Close()

ctx := context.Background()
var msgIDs [][]byte

for i := 0; i < 10; i++ {
if msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
assert.Nil(t, err)
} else {
assert.NotNil(t, msgID)
msgIDs = append(msgIDs, msgID.Serialize())
}
}

assert.Equal(t, 10, len(msgIDs))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Contains(t, msgIDs, msg.ID().Serialize())
consumer.Ack(msg)
}

err = consumer.Unsubscribe()
assert.Nil(t, err)

}
4 changes: 4 additions & 0 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
}

if err := cnx.waitUntilReady(); err != nil {
if !wasCached {
p.pool.Delete(key)
p.log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
}
return nil, err
}
return cnx, nil
Expand Down
27 changes: 15 additions & 12 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ type LookupService interface {
}

type lookupService struct {
rpcClient RPCClient
serviceURL *url.URL
tlsEnabled bool
log log.Logger
metrics *Metrics
rpcClient RPCClient
serviceNameResolver ServiceNameResolver
tlsEnabled bool
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceURL: serviceURL,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}

Expand All @@ -78,7 +78,10 @@ func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (lo

var physicalAddr *url.URL
if lr.GetProxyThroughServiceUrl() {
physicalAddr = ls.serviceURL
physicalAddr, err = ls.serviceNameResolver.ResolveHost()
if err != nil {
return nil, nil, err
}
} else {
physicalAddr = logicalAddress
}
Expand Down
55 changes: 46 additions & 9 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT
func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,
Expand All @@ -131,7 +132,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -144,6 +145,7 @@ func TestLookupSuccess(t *testing.T) {
func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,
Expand All @@ -163,7 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -176,6 +178,7 @@ func TestTlsLookupSuccess(t *testing.T) {
func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,
Expand All @@ -196,7 +199,7 @@ func TestLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -229,7 +232,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -273,7 +276,7 @@ func TestLookupWithRedirect(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -317,7 +320,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand Down Expand Up @@ -350,7 +353,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(false),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -378,7 +381,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Authoritative: proto.Bool(true),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -447,6 +450,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, re
func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,
Expand All @@ -464,10 +468,43 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
assert.NotNil(t, metadata)
assert.Equal(t, metadata.GetPartitions(), uint32(1))
}

func TestLookupSuccessWithMultipleHosts(t *testing.T) {
url, err := url.Parse("pulsar://host1,host2,host3:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
{
RequestId: proto.Uint64(1),
Response: responseType(pb.CommandLookupTopicResponse_Connect),
Authoritative: proto.Bool(true),
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
assert.NotNil(t, lr)

assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
}
35 changes: 21 additions & 14 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type RPCClient interface {
}

type rpcClient struct {
serviceURL *url.URL
serviceNameResolver ServiceNameResolver
pool ConnectionPool
requestTimeout time.Duration
requestIDGenerator uint64
Expand All @@ -66,22 +66,26 @@ type rpcClient struct {
metrics *Metrics
}

func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient {
return &rpcClient{
serviceURL: serviceURL,
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
serviceNameResolver: serviceNameResolver,
pool: pool,
requestTimeout: requestTimeout,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
metrics: metrics,
}
}

func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {

rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
if _, ok := err.(net.Error); ok {
host, err := c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.Errorf("request host resolve failed with error: {%v}", err)
return nil, err
}
rpcResult, err := c.Request(host, host, requestID, cmdType, message)
if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
backoff := Backoff{100 * time.Millisecond}
Expand All @@ -92,17 +96,20 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_
retryTime = backoff.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)

rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
if _, ok := err.(net.Error); ok {
host, err = c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.Errorf("Retrying request host resolve failed with error: {%v}", err)
continue
}
rpcResult, err = c.Request(host, host, requestID, cmdType, message)
if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
continue
} else {
// We either succeeded or encountered a non connection error
break
}
}
}

return rpcResult, err
}

Expand Down
Loading