Skip to content

Commit

Permalink
remove duplicate check
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Sep 30, 2022
1 parent 3896eaa commit b2c58bb
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 82 deletions.
5 changes: 0 additions & 5 deletions pkg/client/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type PushConsumer struct {

func (conf *ConsumerConfig) Build() *PushConsumer {
name := conf.Name
if _, ok := _consumers.Load(name); ok {
xlog.Jupiter().Panic("duplicated load", xlog.String("name", name))
}

xlog.Jupiter().Debug("rocketmq's config: ", xlog.String("name", name), xlog.Any("conf", conf))

Expand Down Expand Up @@ -82,7 +79,6 @@ func (conf *ConsumerConfig) Build() *PushConsumer {
}
})

_consumers.Store(name, cc)
return cc
}

Expand All @@ -91,7 +87,6 @@ func (cc *PushConsumer) Close() {
if err != nil {
xlog.Jupiter().Warn("consumer close fail", zap.Error(err))
}
_consumers.Delete(cc.name)
}

func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer {
Expand Down
71 changes: 0 additions & 71 deletions pkg/client/rocketmq/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,15 @@ package rocketmq

import (
"fmt"
"net/http"
"os"
"runtime"
"sync"

"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/douyu/jupiter/pkg/application"
"github.com/douyu/jupiter/pkg/governor"
"github.com/douyu/jupiter/pkg/xlog"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)

var _producers = &sync.Map{}
var _consumers = &sync.Map{}

func init() {
rlog.SetLogLevel("debug")
rlog.SetLogger(&mqLogger{xlog.Jupiter()})
Expand All @@ -43,67 +35,4 @@ func init() {
fmt.Fprint(os.Stderr, "[rocketmq panic recovery]\n", string(stack[:length]))
xlog.Jupiter().Error("rocketmq panic recovery", zap.Any("error", i))
}

governor.HandleFunc("/debug/rocketmq/stats", func(w http.ResponseWriter, r *http.Request) {
type rocketmqStatus struct {
application.RuntimeStats
RocketMQs map[string]interface{} `json:"rocketmqs"`
FlowInfo map[string]FlowInfo `json:"flowInfo"`
}

var rets = rocketmqStatus{
RuntimeStats: application.NewRuntimeStats(),
RocketMQs: make(map[string]interface{}),
FlowInfo: make(map[string]FlowInfo),
}

_producers.Range(func(key interface{}, val interface{}) bool {
name := key.(string)
cc := val.(*Producer)
rets.RocketMQs[name] = map[string]interface{}{
"role": "producer",
"config": cc.ProducerConfig,
}
rets.FlowInfo[fmt.Sprintf("%s_%s", name, cc.fInfo.GroupType)] = cc.fInfo
return true
})

_consumers.Range(func(key interface{}, val interface{}) bool {
name := key.(string)
cc := val.(*PushConsumer)
rets.RocketMQs[name] = map[string]interface{}{
"config": cc.ConsumerConfig,
"role": "consumer",
}
rets.FlowInfo[name+"_"+cc.fInfo.GroupType] = cc.fInfo
return true
})

_ = jsoniter.NewEncoder(w).Encode(rets)
})
}

func GetProducer(name string) *Producer {
if ins, ok := _producers.Load(name); ok {
return ins.(*Producer)
}
return nil
}

// Get ...
func GetConsumer(name string) *PushConsumer {
if ins, ok := _consumers.Load(name); ok {
return ins.(*PushConsumer)
}

return nil
}

// Invoker ...
func InvokerProducer(name string) *Producer {
if client := GetProducer(name); client != nil {
return client
}

return StdNewProducer(name)
}
6 changes: 0 additions & 6 deletions pkg/client/rocketmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func StdNewProducer(name string) *Producer {
func (conf *ProducerConfig) Build() *Producer {
name := conf.Name

if _, ok := _producers.Load(name); ok {
xlog.Jupiter().Panic("duplicated load", xlog.String("name", name))
}

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint("rocketmq's config: "+name, conf)
}
Expand All @@ -72,7 +68,6 @@ func (conf *ProducerConfig) Build() *Producer {
_ = cc.Start()
})

_producers.Store(name, cc)
return cc
}

Expand Down Expand Up @@ -126,7 +121,6 @@ func (pc *Producer) Close() error {
xlog.Jupiter().Warn("consumer close fail", xlog.Any("error", err.Error()))
return err
}
_producers.Delete(pc.name)
return nil
}

Expand Down

0 comments on commit b2c58bb

Please sign in to comment.