Skip to content

Commit

Permalink
[ISSUE #86] Add Interceptor for producer and consumer. (#85)
Browse files Browse the repository at this point in the history
* add interceptor

* add log

* add producer example

* refactor code according to new version

* refactor code according to new version

* add example

* fix nil bug

* delete extra code

* delete test code

* add comment. resolves #86

* rename

* rename

* stash

* stash

* fix  bug

* stash

* refactor consumer interceptor

* add example

* add example

* 重构interceptor

* fix typo

* add ctx key

* remove extra code

* add ctx to conusme

* refactor consumer interceptor

* refactor consumer interceptor

* refactor consumer interceptor

* lower case chainInterceptor

* rename println
  • Loading branch information
xujianhai666 authored and zongtanghu committed Jul 6, 2019
1 parent f306bd6 commit 3a3f93b
Show file tree
Hide file tree
Showing 14 changed files with 605 additions and 100 deletions.
76 changes: 76 additions & 0 deletions examples/consumer/interceptor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"
"time"

"github.com/apache/rocketmq-client-go/internal/consumer"
"github.com/apache/rocketmq-client-go/primitive"
)

func main() {
c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
primitive.WithConsumerModel(primitive.Clustering),
primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
primitive.WithChainConsumerInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
fmt.Println("subscribe callback: %v", msgs)
return primitive.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
}

func UserFistInterceptor() primitive.CInterceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.CInvoker) error {
msgCtx, _ := primitive.GetConsumerCtx(ctx)
fmt.Printf("msgCtx: %v, mehtod: %s", msgCtx, primitive.GetMethod(ctx))

msgs := req.([]*primitive.MessageExt)
fmt.Printf("user first interceptor before invoke: %v\n", msgs)
e := next(ctx, msgs, reply)

holder := reply.(*primitive.ConsumeResultHolder)
fmt.Printf("user first interceptor after invoke: %v, result: %v\n", msgs, holder)
return e
}
}

func UserSecondInterceptor() primitive.CInterceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.CInvoker) error {
msgs := req.([]*primitive.MessageExt)
fmt.Printf("user second interceptor before invoke: %v\n", msgs)
e := next(ctx, msgs, reply)
holder := reply.(*primitive.ConsumeResultHolder)
fmt.Printf("user second interceptor after invoke: %v, result: %v\n", msgs, holder)
return e
}
}
10 changes: 3 additions & 7 deletions examples/consumer/main.go → examples/consumer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,10 @@ import (
)

func main() {
c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
NameServerAddr: "127.0.0.1:9876",
ConsumerModel: primitive.Clustering,
FromWhere: primitive.ConsumeFromFirstOffset,
})
err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
fmt.Println(msgs)
fmt.Println("subscribe callback: %v", msgs)
return primitive.ConsumeSuccess, nil
})
if err != nil {
Expand Down
74 changes: 74 additions & 0 deletions examples/producer/interceptor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package main implements a producer with user custom interceptor.
package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go/internal/producer"
"github.com/apache/rocketmq-client-go/primitive"
)

func main() {
nameServerAddr := "127.0.0.1:9876"
p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
primitive.WithChainProducerInterceptor(UserFirstInterceptor(), UserSecondInterceptor()))
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 10; i++ {
res, err := p.SendSync(context.Background(), &primitive.Message{
//Topic: "test",
Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
})

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shundown producer error: %s", err.Error())
}
}

func UserFirstInterceptor() primitive.PInterceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
fmt.Printf("user first interceptor before invoke: req:%v, reply: %v\n", req, reply)
err := next(ctx, req, reply)
fmt.Printf("user first interceptor after invoke: req: %v, reply: %v \n", req, reply)
return err
}
}

func UserSecondInterceptor() primitive.PInterceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
fmt.Printf("user second interceptor before invoke: req: %v, reply: %v\n", req, reply)
err := next(ctx, req, reply)
fmt.Printf("user second interceptor after invoke: req: %v, reply: %v \n", req, reply)
return err
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import (
"github.com/apache/rocketmq-client-go/primitive"
)

// Package main implements a simple producer to send message.
func main() {
opt := primitive.ProducerOptions{
NameServerAddr: "127.0.0.1:9876",
RetryTimesWhenSendFailed: 2,
}
p, _ := producer.NewProducer(opt)
nameServerAddr := "127.0.0.1:9876"
p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type defaultConsumer struct {
state kernel.ServiceState
pause bool
once sync.Once
option primitive.ConsumerOption
option primitive.ConsumerOptions
// key: int, hash(*primitive.MessageQueue)
// value: *processQueue
processQueueTable sync.Map
Expand Down
4 changes: 2 additions & 2 deletions internal/consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ var (
queueCounterTable sync.Map
)

func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
func NewConsumer(config primitive.ConsumerOptions) *defaultPullConsumer {
return &defaultPullConsumer{
option: config,
}
}

type defaultPullConsumer struct {
state kernel.ServiceState
option primitive.ConsumerOption
option primitive.ConsumerOptions
client *kernel.RMQClient
GroupName string
Model primitive.MessageModel
Expand Down
Loading

0 comments on commit 3a3f93b

Please sign in to comment.