Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
go-gg committed Sep 13, 2023
0 parents commit 925fa80
Show file tree
Hide file tree
Showing 5 changed files with 1,061 additions and 0 deletions.
215 changes: 215 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package rocketmq

import (
"context"
"dario.cat/mergo"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/go-tron/base-error"
"github.com/go-tron/config"
"github.com/go-tron/logger"
"reflect"
"strings"
)

type DelayLevel int

const (
DelayLevel1s DelayLevel = iota + 1
DelayLevel5s
DelayLevel10s
DelayLevel30s
DelayLevel1m
DelayLevel2m
DelayLevel3m
DelayLevel4m
DelayLevel5m
DelayLevel6m
DelayLevel7m
DelayLevel8m
DelayLevel9m
DelayLevel10m
DelayLevel20m
DelayLevel30m
DelayLevel1h
DelayLevel2h
)

// The DelayLevel specify the waiting time that before next reconsume, [1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h]
type Retry struct {
MaxReconsumeTimes int32
RetryDelayLevel DelayLevel
MaxRetryDelayLevel DelayLevel
}

func NewRetry(retryDelayLevel DelayLevel, maxRetryDelayLevel DelayLevel, maxReconsumeTimes int32) *Retry {
if retryDelayLevel > maxRetryDelayLevel {
panic("retryDelayLevel must less than maxRetryDelayLevel")
}
return &Retry{
MaxReconsumeTimes: maxReconsumeTimes,
RetryDelayLevel: retryDelayLevel,
MaxRetryDelayLevel: maxRetryDelayLevel,
}
}
func defaultRetry() *Retry {
return NewRetry(DelayLevel5s, DelayLevel2h, 30)
}

type RetryStrategy interface {
Calculate(times int32) int
}

type defaultRetryStrategy struct {
*ConsumerConfig
}

func (s *defaultRetryStrategy) Calculate(times int32) int {
level := int(times) + int(s.Retry.RetryDelayLevel)
if level > int(s.Retry.MaxRetryDelayLevel) {
level = int(s.Retry.MaxRetryDelayLevel)
}
return level
}

type ConsumerConfig struct {
NameServer string
ConsumerGroup string
Topic string
Tag string
Key string
Model string
Orderly bool
Retry *Retry
RetryStrategy
RLogger logger.Logger
MsgLogger logger.Logger
}

func NewConsumerWithConfig(c *config.Config, consumer Consumer) {
NewConsumer(&ConsumerConfig{
NameServer: c.GetString("rocketmq.nameServer"),
ConsumerGroup: c.GetString("application.name"),
RLogger: logger.NewZapWithConfig(c, "rocketmq-consumer", "error"),
MsgLogger: logger.NewZapWithConfig(c, "mq-consumer", "info"),
}, consumer)
}

type Consumer interface {
Config() *ConsumerConfig
Handler(msg string) error
}

func NewConsumer(config *ConsumerConfig, c Consumer) {
if err := mergo.Merge(config, c.Config()); err != nil {
panic(err)
}
if config == nil {
panic("config 必须设置")
}
if config.NameServer == "" {
panic("NameServer 必须设置")
}
if config.ConsumerGroup == "" {
panic("ConsumerGroup 必须设置")
}
if config.RLogger == nil {
panic("RLogger 必须设置")
}
if config.MsgLogger == nil {
panic("MsgLogger 必须设置")
}

rlog.SetLogger(&Logger{config.RLogger})

if config.Model == "" {
config.Model = "clustering"
}
var model consumer.MessageModel
switch strings.ToLower(config.Model) {
case "broadcasting":
model = consumer.BroadCasting
case "clustering":
model = consumer.Clustering
default:
panic("unknown MessageModel")
}

if config.Retry == nil {
config.Retry = defaultRetry()
}

if config.RetryStrategy == nil {
config.RetryStrategy = &defaultRetryStrategy{
config,
}
}

var opts = []consumer.Option{
consumer.WithNameServer([]string{config.NameServer}),
consumer.WithGroupName(config.ConsumerGroup),
consumer.WithConsumerModel(model),
consumer.WithMaxReconsumeTimes(config.Retry.MaxReconsumeTimes),
}

if config.Orderly {
opts = append(opts,
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true))
}

pc, err := rocketmq.NewPushConsumer(opts...)
if err != nil {
panic(err)
}

selector := consumer.MessageSelector{}
if config.Tag != "" {
selector.Type = consumer.TAG
selector.Expression = config.Tag
}
if err := pc.Subscribe(config.Topic, selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

var (
errs []error
minReconsumeTimes = msgs[0].ReconsumeTimes
)
for _, msg := range msgs {
if msg.ReconsumeTimes < minReconsumeTimes {
minReconsumeTimes = msg.ReconsumeTimes
}

body := string(msg.Body)
err := c.Handler(body)

if err != nil && !(reflect.TypeOf(err).String() == "*baseError.Error" && !err.(*baseError.Error).System) {
errs = append(errs, err)
}

config.MsgLogger.Info(body,
config.MsgLogger.Field("topic", msg.Topic),
config.MsgLogger.Field("tag", msg.GetTags()),
config.MsgLogger.Field("keys", msg.GetKeys()),
config.MsgLogger.Field("attempts", msg.ReconsumeTimes),
config.MsgLogger.Field("error", err),
)
}

concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
concurrentCtx.DelayLevelWhenNextConsume = config.RetryStrategy.Calculate(minReconsumeTimes)

if len(errs) == 0 {
return consumer.ConsumeSuccess, nil
} else {
return consumer.ConsumeRetryLater, nil
}
}); err != nil {
panic(err)
}

if err := pc.Start(); err != nil {
panic(err)
}
}
50 changes: 50 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module github.com/go-tron/rocketmq

go 1.19

require (
dario.cat/mergo v1.0.0
github.com/apache/rocketmq-client-go/v2 v2.1.2
github.com/go-tron/base-error v1.0.0
github.com/go-tron/config v1.0.0
github.com/go-tron/logger v1.0.1
)

require (
github.com/emirpasic/gods v1.12.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-tron/random v1.0.0 // indirect
github.com/golang/mock v1.4.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.4.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.16.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/tidwall/gjson v1.13.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
stathat.com/c/consistent v1.0.0 // indirect
)
Loading

0 comments on commit 925fa80

Please sign in to comment.