Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix #472] move GetPartitionedTopicMetadata to lookup service #478

Merged
merged 2 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
Expand All @@ -178,7 +177,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
Expand All @@ -204,12 +202,10 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -220,7 +216,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -247,7 +242,6 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
9 changes: 1 addition & 8 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,10 @@ func (c *client) TopicPartitions(topic string) ([]string, error) {
return nil, err
}

id := c.rpcClient.NewRequestID()
res, err := c.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
&pb.CommandPartitionedTopicMetadata{
RequestId: &id,
Topic: &topicName.Name,
})
r, err := c.lookupService.GetPartitionedTopicMetadata(topic)
if err != nil {
return nil, err
}

r := res.Response.PartitionMetadataResponse
if r != nil {
if r.Error != nil {
return nil, newError(ResultLookupError, r.GetError().String())
Expand Down
26 changes: 26 additions & 0 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type LookupService interface {
// Lookup perform a lookup for the given topic, confirm the location of the broker
// where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)

// GetPartitionedTopicMetadata perform a CommandPartitionedTopicMetadata request for
// the given topic, returns the CommandPartitionedTopicMetadataResponse as the result.
GetPartitionedTopicMetadata(topic string) (*pb.CommandPartitionedTopicMetadataResponse, error)
}

type lookupService struct {
Expand Down Expand Up @@ -150,3 +154,25 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {

return nil, errors.New("exceeded max number of redirection during topic lookup")
}

func (ls *lookupService) GetPartitionedTopicMetadata(topic string) (*pb.CommandPartitionedTopicMetadataResponse,
error) {
ls.metrics.PartitionedTopicMetadataRequestsCount.Inc()
topicName, err := ParseTopicName(topic)
if err != nil {
return nil, err
}

id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
&pb.CommandPartitionedTopicMetadata{
RequestId: &id,
Topic: &topicName.Name,
})
if err != nil {
return nil, err
}
ls.log.Debugf("Got topic{%s} partitioned metadata response: %+v", topic, res)

return res.Response.PartitionMetadataResponse, nil
}
120 changes: 104 additions & 16 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/log"
)

type mockedRPCClient struct {
type mockedLookupRPCClient struct {
requestIDGenerator uint64
t *testing.T

Expand All @@ -38,20 +38,20 @@ type mockedRPCClient struct {
}

// Create a new unique request id
func (c *mockedRPCClient) NewRequestID() uint64 {
func (c *mockedLookupRPCClient) NewRequestID() uint64 {
c.requestIDGenerator++
return c.requestIDGenerator
}

func (c *mockedRPCClient) NewProducerID() uint64 {
func (c *mockedLookupRPCClient) NewProducerID() uint64 {
return 1
}

func (c *mockedRPCClient) NewConsumerID() uint64 {
func (c *mockedLookupRPCClient) NewConsumerID() uint64 {
return 1
}

func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)

Expand All @@ -71,7 +71,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCo
}, nil
}

func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
Expand All @@ -93,13 +93,14 @@ func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, r
}, nil
}

func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
func (c *mockedLookupRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}

func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
func (c *mockedLookupRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type,
message proto.Message) error {
assert.Fail(c.t, "Shouldn't be called")
return nil
}
Expand All @@ -112,7 +113,7 @@ func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
Expand Down Expand Up @@ -144,7 +145,7 @@ func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
Expand Down Expand Up @@ -176,7 +177,7 @@ func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedURL: "pulsar://broker-2:6650",

Expand Down Expand Up @@ -286,7 +287,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedURL: "pulsar+ssl://broker-2:6651",

Expand Down Expand Up @@ -330,7 +331,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
Expand Down Expand Up @@ -360,7 +361,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

ls := NewLookupService(&mockedRPCClient{
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

expectedRequests: []pb.CommandLookupTopic{
Expand All @@ -383,3 +384,90 @@ func TestLookupWithLookupFailure(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, lr)
}

type mockedPartitionedTopicMetadataRPCClient struct {
requestIDGenerator uint64
t *testing.T

expectedRequests []pb.CommandPartitionedTopicMetadata
mockedResponses []pb.CommandPartitionedTopicMetadataResponse
}

func (m mockedPartitionedTopicMetadataRPCClient) NewRequestID() uint64 {
m.requestIDGenerator++
return m.requestIDGenerator
}

func (m mockedPartitionedTopicMetadataRPCClient) NewProducerID() uint64 {
return 1
}

func (m mockedPartitionedTopicMetadataRPCClient) NewConsumerID() uint64 {
return 1
}

func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
assert.Equal(m.t, cmdType, pb.BaseCommand_PARTITIONED_METADATA)

expectedRequest := &m.expectedRequests[0]
m.expectedRequests = m.expectedRequests[1:]

assert.Equal(m.t, *expectedRequest.RequestId, requestID)

mockedResponse := &m.mockedResponses[0]
m.mockedResponses = m.mockedResponses[1:]

return &RPCResult{
&pb.BaseCommand{
PartitionMetadataResponse: mockedResponse,
},
nil,
}, nil
}

func (m mockedPartitionedTopicMetadataRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
assert.Fail(m.t, "Shouldn't be called")
return nil, nil
}

func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type,
message proto.Message) error {
assert.Fail(m.t, "Shouldn't be called")
return nil
}

func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
assert.Fail(m.t, "Shouldn't be called")
return nil, nil
}

func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,

expectedRequests: []pb.CommandPartitionedTopicMetadata{
{
RequestId: proto.Uint64(1),
Topic: proto.String("my-topic"),
},
},
mockedResponses: []pb.CommandPartitionedTopicMetadataResponse{
{
RequestId: proto.Uint64(1),
Partitions: proto.Uint32(1),
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
assert.NotNil(t, metadata)
assert.Equal(t, metadata.GetPartitions(), uint32(1))
}
20 changes: 14 additions & 6 deletions pulsar/internal/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ type Metrics struct {
readersClosed *prometheus.CounterVec

// Metrics that are not labeled with topic, are immediately available
ConnectionsOpened prometheus.Counter
ConnectionsClosed prometheus.Counter
ConnectionsEstablishmentErrors prometheus.Counter
ConnectionsHandshakeErrors prometheus.Counter
LookupRequestsCount prometheus.Counter
RPCRequestCount prometheus.Counter
ConnectionsOpened prometheus.Counter
ConnectionsClosed prometheus.Counter
ConnectionsEstablishmentErrors prometheus.Counter
ConnectionsHandshakeErrors prometheus.Counter
LookupRequestsCount prometheus.Counter
PartitionedTopicMetadataRequestsCount prometheus.Counter
RPCRequestCount prometheus.Counter
}

type TopicMetrics struct {
Expand Down Expand Up @@ -268,6 +269,12 @@ func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
ConstLabels: constLabels,
}),

PartitionedTopicMetadataRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_partitioned_topic_metadata_count",
Help: "Counter of partitioned_topic_metadata requests made by the client",
ConstLabels: constLabels,
}),

RPCRequestCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_rpc_count",
Help: "Counter of RPC requests made by the client",
Expand Down Expand Up @@ -306,6 +313,7 @@ func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
return metrics
}
Expand Down