Skip to content

Commit

Permalink
feat: support nameserver resolver
Browse files Browse the repository at this point in the history
use nameserver resolver instead of nameserver addr, we can pass
passThrough for direct addr, httpResolver for domain, envResovler
for env param.

Closes #455
  • Loading branch information
xujianhai666 committed Mar 20, 2020
1 parent 1ce9fd9 commit 9f442e3
Show file tree
Hide file tree
Showing 49 changed files with 497 additions and 345 deletions.
1 change: 1 addition & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rocketmq

import (
"context"

"github.com/pkg/errors"

"github.com/apache/rocketmq-client-go/v2/consumer"
Expand Down
6 changes: 2 additions & 4 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
"sync/atomic"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/pkg/errors"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -273,9 +274,6 @@ type defaultConsumer struct {
}

func (dc *defaultConsumer) start() error {
if len(dc.option.NameServerAddrs) == 0 {
dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
}
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
Expand Down
3 changes: 2 additions & 1 deletion consumer/mock_offset_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"sync"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
Expand Down Expand Up @@ -255,12 +255,6 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
rlog.LogKeyUnderlayError: err.Error(),
"offset": off,
})
} else {
rlog.Info("update offset to broker success", map[string]interface{}{
rlog.LogKeyConsumerGroup: r.group,
rlog.LogKeyMessageQueue: mq.String(),
"offset": off,
})
}
}
}
Expand Down
24 changes: 10 additions & 14 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type consumerOptions struct {
//
AutoCommit bool
RebalanceLockInterval time.Duration

Resolver primitive.NsResolver
}

func defaultPushConsumerOptions() consumerOptions {
Expand All @@ -115,6 +117,7 @@ func defaultPushConsumerOptions() consumerOptions {
MaxReconsumeTimes: -1,
ConsumerModel: Clustering,
AutoCommit: true,
Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand All @@ -125,6 +128,7 @@ type Option func(*consumerOptions)
func defaultPullConsumerOptions() consumerOptions {
opts := consumerOptions{
ClientOptions: internal.DefaultClientOptions(),
Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand Down Expand Up @@ -179,20 +183,6 @@ func WithInstance(name string) Option {
}
}

// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
return func(opts *consumerOptions) {
opts.NameServerAddrs = nameServers
}
}

// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *consumerOptions) {
opts.NameServerDomain = nameServerUrl
}
}

// WithNamespace set the namespace of consumer
func WithNamespace(namespace string) Option {
return func(opts *consumerOptions) {
Expand Down Expand Up @@ -263,3 +253,9 @@ func WithPullInterval(interval time.Duration) Option {
options.PullInterval = interval
}
}

func WithNsResovler(resolver primitive.NsResolver) Option {
return func(options *consumerOptions) {
options.Resolver = resolver
}
}
2 changes: 1 addition & 1 deletion consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
apply(&defaultOpts)
}

srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
Expand Down
2 changes: 1 addition & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
Expand Down
5 changes: 3 additions & 2 deletions consumer/push_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package consumer
import (
"context"
"fmt"
"testing"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"
"testing"
)

func mockB4Start(c *pushConsumer) {
Expand All @@ -35,7 +36,7 @@ func TestStart(t *testing.T) {
Convey("test Start method", t, func() {
c, _ := NewPushConsumer(
WithGroupName("testGroup"),
WithNameServer([]string{"127.0.0.1:9876"}),
WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithConsumerModel(BroadCasting),
)

Expand Down
3 changes: 2 additions & 1 deletion consumer/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package consumer

import (
"fmt"
"testing"

"github.com/apache/rocketmq-client-go/v2/primitive"
. "github.com/smartystreets/goconvey/convey"
"testing"
)

func TestAllocateByAveragely(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/acl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerModel(consumer.BroadCasting),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/delay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/orderly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/pull/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPullConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/retry/concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
)

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/retry/order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/strategy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithStrategy(consumer.AllocateByAveragely),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/tag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {

c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer(namesrvs),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithTrace(traceCfg),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/acl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/async/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// Package main implements a async producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithQueueSelector(producer.NewManualQueueSelector()))

Expand Down
2 changes: 1 addition & 1 deletion examples/producer/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/delay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithInterceptor(UserFirstInterceptor(), UserSecondInterceptor()),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/tag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
}

p, _ := rocketmq.NewProducer(
producer.WithNameServer(namesrvs),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithTrace(traceCfg))
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/transaction/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primiti
func main() {
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(1),
)
err := p.Start()
Expand Down
Loading

0 comments on commit 9f442e3

Please sign in to comment.