Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support nameserver resolver #457

Merged
merged 12 commits into from
Apr 26, 2020
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ language: go
go:
- "1.11.x"
- "1.12.x"

- "1.13.x"
go_import_path: github.com/apache/rocketmq-client-go/v2

env:
global:
- NAME_SERVER_ADDRESS=127.0.0.1:9876
Expand All @@ -27,7 +26,10 @@ before_script:
- nohup sh bin/mqbroker -n localhost:9876 &

script:
- cd ${TRAVIS_HOME}
- ls -al
- cd ${GOPATH}/src/github.com/apache/rocketmq-client-go/v2
- ls -al
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic

Expand Down
1 change: 1 addition & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rocketmq

import (
"context"

"github.com/pkg/errors"

"github.com/apache/rocketmq-client-go/v2/consumer"
Expand Down
6 changes: 2 additions & 4 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
"sync/atomic"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/pkg/errors"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -273,9 +274,6 @@ type defaultConsumer struct {
}

func (dc *defaultConsumer) start() error {
if len(dc.option.NameServerAddrs) == 0 {
dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
}
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
Expand Down
3 changes: 2 additions & 1 deletion consumer/mock_offset_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"sync"
"time"

"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
Expand Down Expand Up @@ -255,12 +255,6 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
rlog.LogKeyUnderlayError: err.Error(),
"offset": off,
})
} else {
rlog.Info("update offset to broker success", map[string]interface{}{
rlog.LogKeyConsumerGroup: r.group,
rlog.LogKeyMessageQueue: mq.String(),
"offset": off,
})
}
}
}
Expand Down
39 changes: 25 additions & 14 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type consumerOptions struct {
//
AutoCommit bool
RebalanceLockInterval time.Duration

Resolver primitive.NsResolver
}

func defaultPushConsumerOptions() consumerOptions {
Expand All @@ -115,6 +117,7 @@ func defaultPushConsumerOptions() consumerOptions {
MaxReconsumeTimes: -1,
ConsumerModel: Clustering,
AutoCommit: true,
Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand All @@ -125,6 +128,7 @@ type Option func(*consumerOptions)
func defaultPullConsumerOptions() consumerOptions {
opts := consumerOptions{
ClientOptions: internal.DefaultClientOptions(),
Resolver: primitive.NewHttpResolver("DEFAULT"),
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
Expand Down Expand Up @@ -179,20 +183,6 @@ func WithInstance(name string) Option {
}
}

// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
return func(opts *consumerOptions) {
opts.NameServerAddrs = nameServers
}
}

// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *consumerOptions) {
opts.NameServerDomain = nameServerUrl
}
}

// WithNamespace set the namespace of consumer
func WithNamespace(namespace string) Option {
return func(opts *consumerOptions) {
Expand Down Expand Up @@ -263,3 +253,24 @@ func WithPullInterval(interval time.Duration) Option {
options.PullInterval = interval
}
}

// WithNsResovler set nameserver resolver to fetch nameserver addr
func WithNsResovler(resolver primitive.NsResolver) Option {
return func(options *consumerOptions) {
options.Resolver = resolver
}
}

// WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServer(nameServers primitive.NamesrvAddr) Option {
return func(options *consumerOptions) {
options.Resolver = primitive.NewPassthroughResolver(nameServers)
}
}

// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *consumerOptions) {
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
}
}
2 changes: 1 addition & 1 deletion consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
apply(&defaultOpts)
}

srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
Expand Down
2 changes: 1 addition & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
Expand Down
5 changes: 3 additions & 2 deletions consumer/push_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package consumer
import (
"context"
"fmt"
"testing"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"
"testing"
)

func mockB4Start(c *pushConsumer) {
Expand All @@ -35,7 +36,7 @@ func TestStart(t *testing.T) {
Convey("test Start method", t, func() {
c, _ := NewPushConsumer(
WithGroupName("testGroup"),
WithNameServer([]string{"127.0.0.1:9876"}),
WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithConsumerModel(BroadCasting),
)

Expand Down
3 changes: 2 additions & 1 deletion consumer/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package consumer

import (
"fmt"
"testing"

"github.com/apache/rocketmq-client-go/v2/primitive"
. "github.com/smartystreets/goconvey/convey"
"testing"
)

func TestAllocateByAveragely(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/acl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerModel(consumer.BroadCasting),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/delay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/orderly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/pull/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, err := rocketmq.NewPullConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/retry/concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
)

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/retry/order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/strategy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithStrategy(consumer.AllocateByAveragely),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/tag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {

c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer(namesrvs),
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
consumer.WithTrace(traceCfg),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/acl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/async/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// Package main implements a async producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithQueueSelector(producer.NewManualQueueSelector()))

Expand Down
2 changes: 1 addition & 1 deletion examples/producer/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/delay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithInterceptor(UserFirstInterceptor(), UserSecondInterceptor()),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func main() {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/tag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
Expand Down
Loading