Skip to content

Commit

Permalink
#540 fix lookup service not implemented GetTopicsOfNamespace (#541)
Browse files Browse the repository at this point in the history
Fixes #540

### Motivation

When using regex topics consumer with HTTP client URL, the client will fail to query namespace topics and throw failed handshake error. This is because the client sends protobuf requests to HTTP service.

### Modifications

add GetTopicsOfNamespace to LookupService interface
add related tests

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
freeznet authored Jun 17, 2021
1 parent 579984e commit 55c15ef
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 26 deletions.
21 changes: 0 additions & 21 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,10 @@ import (
"net/url"
"time"

"github.com/gogo/protobuf/proto"

"github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

Expand Down Expand Up @@ -204,21 +201,3 @@ func (c *client) Close() {
c.cnxPool.Close()
c.lookupService.Close()
}

func (c *client) namespaceTopics(namespace string) ([]string, error) {
id := c.rpcClient.NewRequestID()
req := &pb.CommandGetTopicsOfNamespace{
RequestId: proto.Uint64(id),
Namespace: proto.String(namespace),
Mode: pb.CommandGetTopicsOfNamespace_PERSISTENT.Enum(),
}
res, err := c.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_TOPICS_OF_NAMESPACE, req)
if err != nil {
return nil, err
}
if res.Response.Error != nil {
return []string{}, newError(LookupError, res.Response.GetError().String())
}

return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
}
89 changes: 86 additions & 3 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"

"github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -381,11 +383,29 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) {

// fetch from namespace that does not exist
name := generateRandomName()
topics, err := ci.namespaceTopics(fmt.Sprintf("%s/%s", name, name))
topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), internal.Persistent)
assert.Nil(t, err)
assert.Equal(t, 0, len(topics))
}

func TestNamespaceTopicsNamespaceDoesNotExitWebURL(t *testing.T) {
c, err := NewClient(ClientOptions{
URL: webServiceURL,
})
if err != nil {
t.Errorf("failed to create client error: %+v", err)
return
}
defer c.Close()
ci := c.(*client)

// fetch from namespace that does not exist
name := generateRandomName()
topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), internal.Persistent)
assert.NotNil(t, err)
assert.Equal(t, 0, len(topics))
}

func TestNamespaceTopics(t *testing.T) {
name := generateRandomName()
namespace := fmt.Sprintf("public/%s", name)
Expand Down Expand Up @@ -421,7 +441,70 @@ func TestNamespaceTopics(t *testing.T) {
defer c.Close()
ci := c.(*client)

topics, err := ci.namespaceTopics(namespace)
topics, err := ci.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))

// add a non-persistent topic
topicName := fmt.Sprintf("non-persistent://%s/testNonPersistentTopic", namespace)
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer client.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})

assert.Nil(t, err)
defer producer.Close()

topics, err = ci.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))
}

func TestNamespaceTopicsWebURL(t *testing.T) {
name := generateRandomName()
namespace := fmt.Sprintf("public/%s", name)
namespaceURL := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
err := httpPut(namespaceURL, anonymousNamespacePolicy())
if err != nil {
t.Fatal()
}
defer func() {
_ = httpDelete(fmt.Sprintf("admin/v2/namespaces/%s", namespace))
}()

// create topics
topic1 := fmt.Sprintf("%s/topic-1", namespace)
if err := httpPut("admin/v2/persistent/"+topic1, nil); err != nil {
t.Fatal(err)
}
topic2 := fmt.Sprintf("%s/topic-2", namespace)
if err := httpPut("admin/v2/persistent/"+topic2, namespace); err != nil {
t.Fatal(err)
}
defer func() {
_ = httpDelete("admin/v2/persistent/"+topic1, "admin/v2/persistent/"+topic2)
}()

c, err := NewClient(ClientOptions{
URL: webServiceURL,
})
if err != nil {
t.Errorf("failed to create client error: %+v", err)
return
}
defer c.Close()
ci := c.(*client)

topics, err := ci.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent)
if err != nil {
t.Fatal(err)
}
Expand All @@ -442,7 +525,7 @@ func TestNamespaceTopics(t *testing.T) {
assert.Nil(t, err)
defer producer.Close()

topics, err = ci.namespaceTopics(namespace)
topics, err = ci.lookupService.GetTopicsOfNamespace(namespace, internal.Persistent)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (c *regexConsumer) unsubscribe(topics []string) {
}

func (c *regexConsumer) topics() ([]string, error) {
topics, err := c.client.namespaceTopics(c.namespace)
topics, err := c.client.lookupService.GetTopicsOfNamespace(c.namespace, internal.Persistent)
if err != nil {
return nil, err
}
Expand Down
56 changes: 55 additions & 1 deletion pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ type LookupResult struct {
PhysicalAddr *url.URL
}

// GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
type GetTopicsOfNamespaceMode string

const (
Persistent GetTopicsOfNamespaceMode = "PERSISTENT"
NonPersistent = "NON_PERSISTENT"
All = "ALL"
)

// PartitionedTopicMetadata encapsulates a struct for metadata of a partitioned topic
type PartitionedTopicMetadata struct {
Partitions int `json:"partitions"` // Number of partitions for the topic
Expand All @@ -49,7 +58,10 @@ type LookupService interface {
// the given topic, returns the CommandPartitionedTopicMetadataResponse as the result.
GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata, error)

// Allow Lookup Service's internal client to be able to closed
// GetTopicsOfNamespace returns all the topics name for a given namespace.
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)

// Closable Allow Lookup Service's internal client to be able to closed
Closable
}

Expand Down Expand Up @@ -209,12 +221,33 @@ func (ls *lookupService) GetPartitionedTopicMetadata(topic string) (*Partitioned
return &partitionedTopicMetadata, nil
}

func (ls *lookupService) GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) {
id := ls.rpcClient.NewRequestID()
pbMode := pb.CommandGetTopicsOfNamespace_Mode(pb.CommandGetTopicsOfNamespace_Mode_value[string(mode)])
req := &pb.CommandGetTopicsOfNamespace{
RequestId: proto.Uint64(id),
Namespace: proto.String(namespace),
Mode: &pbMode,
}
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_TOPICS_OF_NAMESPACE, req)
if err != nil {
return nil, err
}
if res.Response.Error != nil {
return []string{}, errors.New(res.Response.GetError().String())
}

return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
}

func (ls *lookupService) Close() {}

const HTTPLookupServiceBasePathV1 string = "/lookup/v2/destination/"
const HTTPLookupServiceBasePathV2 string = "/lookup/v2/topic/"
const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"

type httpLookupData struct {
BrokerURL string `json:"brokerUrl"`
Expand Down Expand Up @@ -304,6 +337,27 @@ func (h *httpLookupService) GetPartitionedTopicMetadata(topic string) (*Partitio
return tMetadata, nil
}

func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) {

format := HTTPTopicUnderNamespaceV2
if !IsV2Namespace(namespace) {
format = HTTPTopicUnderNamespaceV1
}

path := fmt.Sprintf(format, namespace, string(mode))

topics := []string{}

err := h.httpClient.Get(path, &topics)
if err != nil {
return nil, err
}

h.log.Debugf("Got namespace{%s} mode{%s} topics response: %+v", namespace, mode, topics)

return topics, nil
}

func (h *httpLookupService) Close() {
h.httpClient.Close()
}
Expand Down
26 changes: 26 additions & 0 deletions pulsar/internal/namespace_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import "strings"

func IsV2Namespace(namespace string) bool {
parts := strings.Split(namespace, "/")
// Legacy namespace name that includes cluster name
return len(parts) == 2
}
30 changes: 30 additions & 0 deletions pulsar/internal/namespace_name_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsV2Namespace(t *testing.T) {
assert.True(t, IsV2Namespace("tenant/default"))
assert.False(t, IsV2Namespace("tenant/cluster/default"))
assert.False(t, IsV2Namespace("default"))
}

0 comments on commit 55c15ef

Please sign in to comment.