Skip to content

Commit

Permalink
feat: admin
Browse files Browse the repository at this point in the history
  • Loading branch information
emberfarkas committed Jan 24, 2024
1 parent 1ad750c commit 4674276
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 31 deletions.
46 changes: 22 additions & 24 deletions client/rabbitmq/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,34 @@ func MustNewAdmin(c *RabbitConf) *Admin {
}
}

func (q *Admin) DeclareExchange(conf *AdminExchangeConf, args amqp.Table) error {
return q.channel.ExchangeDeclare(
conf.Name,
conf.Kind,
conf.Durable,
conf.AutoDelete,
conf.Internal,
conf.NoWait,
func (q *Admin) Bind(queueName string, routekey string, exchange, kind string, args amqp.Table) error {
if err := q.channel.ExchangeDeclare(
exchange,
kind,
true, // 持久化
false, // 自动删除
false, // 非系统内部使用
false, //
args,
)
}

func (q *Admin) DeclareQueue(conf *AdminQueueConf, args amqp.Table) error {
_, err := q.channel.QueueDeclare(
conf.Name,
conf.Durable,
conf.AutoDelete,
conf.Exclusive,
conf.NoWait,
); err != nil {
return err
}
queue, err := q.channel.QueueDeclare(
queueName,
true,
false,
false,
false,
args,
)
return err
}

func (q *Admin) Bind(queueName string, routekey string, exchange string, notWait bool, args amqp.Table) error {
if err != nil {
return err
}
return q.channel.QueueBind(
queueName,
queue.Name,
routekey,
exchange,
notWait,
false,
args,
)
}
8 changes: 8 additions & 0 deletions client/rabbitmq/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@ package rabbitmq

import (
"fmt"
"github.com/streadway/amqp"
"net/url"
)

func (c *RabbitConf) URL() string {
return fmt.Sprintf("amqp://%s:%s@%s:%d%s", c.Username, url.QueryEscape(c.Password), c.Host, c.Port, c.VHost)
}

var (
ExchangeDirect = amqp.ExchangeDirect
ExchangeFanout = amqp.ExchangeFanout
ExchangeTopic = amqp.ExchangeTopic
ExchangeHeaders = amqp.ExchangeHeaders
)
8 changes: 6 additions & 2 deletions client/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,13 @@ func (c *rocketQueue) Stop(context.Context) error {
return nil
}

func (c *rocketQueue) consumGroupTopic(topic, expression string) {
func (c *rocketQueue) consumGroupTopic(topic, expression string) error {
selector := consumer.MessageSelector{Type: consumer.TAG, Expression: expression}
c.sub.Subscribe(topic, selector, c.handleMsg)
err := c.sub.Subscribe(topic, selector, c.handleMsg)
if err != nil {
return err
}
return nil
}

func (c *rocketQueue) handleMsg(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Expand Down
5 changes: 0 additions & 5 deletions log/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ func GetCore() zapcore.Core {
return global.GetLogger().logger.Core()
}

// Log Print log by level and keyvals.
//func Log(level Level, keyvals ...interface{}) {
// global.slogger.l(level, keyvals...)
//}

func With(kv ...interface{}) *ZapLogger {
core := global.ZapLogger.logger.Core()
core = WithCore(core, kv...)
Expand Down

0 comments on commit 4674276

Please sign in to comment.