Skip to content

Commit

Permalink
feat: support nameserver resolver (#457)
Browse files Browse the repository at this point in the history
use nameserver resolver instead of nameserver address, we can pass
passThrough for direct address, httpResolver for a domain, an env resolver
for env param.
  • Loading branch information
xujianhai666 committed Apr 26, 2020
1 parent 0897f19 commit 17a373f
Show file tree
Hide file tree
Showing 50 changed files with 562 additions and 328 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ language: go
go:
- "1.11.x"
- "1.12.x"

- "1.13.x"
go_import_path: github.com/apache/rocketmq-client-go/v2

env:
global:
- NAME_SERVER_ADDRESS=127.0.0.1:9876
Expand All @@ -27,7 +26,10 @@ before_script:
- nohup sh bin/mqbroker -n localhost:9876 &

script:
- cd ${TRAVIS_HOME}
- ls -al
- cd ${GOPATH}/src/github.com/apache/rocketmq-client-go/v2
- ls -al
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic

Expand Down
1 change: 1 addition & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rocketmq

import (
"context"

"github.com/pkg/errors"

"github.com/apache/rocketmq-client-go/v2/consumer"
Expand Down
6 changes: 2 additions & 4 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
"sync/atomic"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/pkg/errors"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -273,9 +274,6 @@ type defaultConsumer struct {
}

func (dc *defaultConsumer) start() error {
if len(dc.option.NameServerAddrs) == 0 {
dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
}
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
Expand Down
3 changes: 2 additions & 1 deletion consumer/mock_offset_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"sync"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
Expand Down Expand Up @@ -255,12 +255,6 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
rlog.LogKeyUnderlayError: err.Error(),
"offset": off,
})
} else {
rlog.Info("update offset to broker success", map[string]interface{}{
rlog.LogKeyConsumerGroup: r.group,
rlog.LogKeyMessageQueue: mq.String(),
"offset": off,
})
}
}
}
Expand Down
39 changes: 25 additions & 14 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type consumerOptions struct {
//
AutoCommit bool
RebalanceLockInterval time.Duration

Resolver primitive.NsResolver
}

func defaultPushConsumerOptions() consumerOptions {
Expand All @@ -115,6 +117,7 @@ func defaultPushConsumerOptions() consumerOptions {
MaxReconsumeTimes: -1,
ConsumerModel: Clustering,
AutoCommit: true,
Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand All @@ -125,6 +128,7 @@ type Option func(*consumerOptions)
func defaultPullConsumerOptions() consumerOptions {
opts := consumerOptions{
ClientOptions: internal.DefaultClientOptions(),
Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand Down Expand Up @@ -179,20 +183,6 @@ func WithInstance(name string) Option {
}
}

// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
return func(opts *consumerOptions) {
opts.NameServerAddrs = nameServers
}
}

// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *consumerOptions) {
opts.NameServerDomain = nameServerUrl
}
}

// WithNamespace set the namespace of consumer
func WithNamespace(namespace string) Option {
return func(opts *consumerOptions) {
Expand Down Expand Up @@ -263,3 +253,24 @@ func WithPullInterval(interval time.Duration) Option {
options.PullInterval = interval
}
}

// WithNsResovler set nameserver resolver to fetch nameserver addr
func WithNsResovler(resolver primitive.NsResolver) Option {
return func(options *consumerOptions) {
options.Resolver = resolver
}
}

// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
return func(options *consumerOptions) {
options.Resolver = primitive.NewPassthroughResolver(nameServers)
}
}

// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *consumerOptions) {
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
}
}
2 changes: 1 addition & 1 deletion consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
apply(&defaultOpts)
}

srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
Expand Down
2 changes: 1 addition & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
Expand Down
5 changes: 3 additions & 2 deletions consumer/push_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package consumer
import (
"context"
"fmt"
"testing"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"
"testing"
)

func mockB4Start(c *pushConsumer) {
Expand All @@ -35,7 +36,7 @@ func TestStart(t *testing.T) {
Convey("test Start method", t, func() {
c, _ := NewPushConsumer(
WithGroupName("testGroup"),
WithNameServer([]string{"127.0.0.1:9876"}),
WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithConsumerModel(BroadCasting),
)

Expand Down
3 changes: 2 additions & 1 deletion consumer/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package consumer

import (
"fmt"
"testing"

"github.com/apache/rocketmq-client-go/v2/primitive"
. "github.com/smartystreets/goconvey/convey"
"testing"
)

func TestAllocateByAveragely(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/acl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerModel(consumer.BroadCasting),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/delay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/orderly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/pull/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPullConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/retry/concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
)

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/retry/order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/strategy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithStrategy(consumer.AllocateByAveragely),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/tag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {

c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer(namesrvs),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithTrace(traceCfg),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/acl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/async/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// Package main implements a async producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithQueueSelector(producer.NewManualQueueSelector()))

Expand Down
2 changes: 1 addition & 1 deletion examples/producer/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/delay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithInterceptor(UserFirstInterceptor(), UserSecondInterceptor()),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/tag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
Loading

0 comments on commit 17a373f

Please sign in to comment.