From b2c58bb8ae6da628b368c6608b9f9ebdb4a96424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 30 Sep 2022 20:14:14 +0800 Subject: [PATCH] remove duplicate check --- pkg/client/rocketmq/consumer.go | 5 --- pkg/client/rocketmq/init.go | 71 --------------------------------- pkg/client/rocketmq/producer.go | 6 --- 3 files changed, 82 deletions(-) diff --git a/pkg/client/rocketmq/consumer.go b/pkg/client/rocketmq/consumer.go index 35d6b0d010..77e4cbef12 100644 --- a/pkg/client/rocketmq/consumer.go +++ b/pkg/client/rocketmq/consumer.go @@ -46,9 +46,6 @@ type PushConsumer struct { func (conf *ConsumerConfig) Build() *PushConsumer { name := conf.Name - if _, ok := _consumers.Load(name); ok { - xlog.Jupiter().Panic("duplicated load", xlog.String("name", name)) - } xlog.Jupiter().Debug("rocketmq's config: ", xlog.String("name", name), xlog.Any("conf", conf)) @@ -82,7 +79,6 @@ func (conf *ConsumerConfig) Build() *PushConsumer { } }) - _consumers.Store(name, cc) return cc } @@ -91,7 +87,6 @@ func (cc *PushConsumer) Close() { if err != nil { xlog.Jupiter().Warn("consumer close fail", zap.Error(err)) } - _consumers.Delete(cc.name) } func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer { diff --git a/pkg/client/rocketmq/init.go b/pkg/client/rocketmq/init.go index b107ef8aa0..121e7cef32 100644 --- a/pkg/client/rocketmq/init.go +++ b/pkg/client/rocketmq/init.go @@ -16,23 +16,15 @@ package rocketmq import ( "fmt" - "net/http" "os" "runtime" - "sync" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" - "github.com/douyu/jupiter/pkg/application" - "github.com/douyu/jupiter/pkg/governor" "github.com/douyu/jupiter/pkg/xlog" - jsoniter "github.com/json-iterator/go" "go.uber.org/zap" ) -var _producers = &sync.Map{} -var _consumers = &sync.Map{} - func init() { rlog.SetLogLevel("debug") rlog.SetLogger(&mqLogger{xlog.Jupiter()}) @@ -43,67 +35,4 @@ func init() { fmt.Fprint(os.Stderr, "[rocketmq panic recovery]\n", string(stack[:length])) xlog.Jupiter().Error("rocketmq panic recovery", zap.Any("error", i)) } - - governor.HandleFunc("/debug/rocketmq/stats", func(w http.ResponseWriter, r *http.Request) { - type rocketmqStatus struct { - application.RuntimeStats - RocketMQs map[string]interface{} `json:"rocketmqs"` - FlowInfo map[string]FlowInfo `json:"flowInfo"` - } - - var rets = rocketmqStatus{ - RuntimeStats: application.NewRuntimeStats(), - RocketMQs: make(map[string]interface{}), - FlowInfo: make(map[string]FlowInfo), - } - - _producers.Range(func(key interface{}, val interface{}) bool { - name := key.(string) - cc := val.(*Producer) - rets.RocketMQs[name] = map[string]interface{}{ - "role": "producer", - "config": cc.ProducerConfig, - } - rets.FlowInfo[fmt.Sprintf("%s_%s", name, cc.fInfo.GroupType)] = cc.fInfo - return true - }) - - _consumers.Range(func(key interface{}, val interface{}) bool { - name := key.(string) - cc := val.(*PushConsumer) - rets.RocketMQs[name] = map[string]interface{}{ - "config": cc.ConsumerConfig, - "role": "consumer", - } - rets.FlowInfo[name+"_"+cc.fInfo.GroupType] = cc.fInfo - return true - }) - - _ = jsoniter.NewEncoder(w).Encode(rets) - }) -} - -func GetProducer(name string) *Producer { - if ins, ok := _producers.Load(name); ok { - return ins.(*Producer) - } - return nil -} - -// Get ... -func GetConsumer(name string) *PushConsumer { - if ins, ok := _consumers.Load(name); ok { - return ins.(*PushConsumer) - } - - return nil -} - -// Invoker ... -func InvokerProducer(name string) *Producer { - if client := GetProducer(name); client != nil { - return client - } - - return StdNewProducer(name) } diff --git a/pkg/client/rocketmq/producer.go b/pkg/client/rocketmq/producer.go index 6ad3704ece..154dcbce9f 100644 --- a/pkg/client/rocketmq/producer.go +++ b/pkg/client/rocketmq/producer.go @@ -43,10 +43,6 @@ func StdNewProducer(name string) *Producer { func (conf *ProducerConfig) Build() *Producer { name := conf.Name - if _, ok := _producers.Load(name); ok { - xlog.Jupiter().Panic("duplicated load", xlog.String("name", name)) - } - if xdebug.IsDevelopmentMode() { xdebug.PrettyJsonPrint("rocketmq's config: "+name, conf) } @@ -72,7 +68,6 @@ func (conf *ProducerConfig) Build() *Producer { _ = cc.Start() }) - _producers.Store(name, cc) return cc } @@ -126,7 +121,6 @@ func (pc *Producer) Close() error { xlog.Jupiter().Warn("consumer close fail", xlog.Any("error", err.Error())) return err } - _producers.Delete(pc.name) return nil }