diff --git a/client/rabbitmq/admin.go b/client/rabbitmq/admin.go index 841db7d..12cd0d5 100644 --- a/client/rabbitmq/admin.go +++ b/client/rabbitmq/admin.go @@ -25,36 +25,34 @@ func MustNewAdmin(c *RabbitConf) *Admin { } } -func (q *Admin) DeclareExchange(conf *AdminExchangeConf, args amqp.Table) error { - return q.channel.ExchangeDeclare( - conf.Name, - conf.Kind, - conf.Durable, - conf.AutoDelete, - conf.Internal, - conf.NoWait, +func (q *Admin) Bind(queueName string, routekey string, exchange, kind string, args amqp.Table) error { + if err := q.channel.ExchangeDeclare( + exchange, + kind, + true, // 持久化 + false, // 自动删除 + false, // 非系统内部使用 + false, // args, - ) -} - -func (q *Admin) DeclareQueue(conf *AdminQueueConf, args amqp.Table) error { - _, err := q.channel.QueueDeclare( - conf.Name, - conf.Durable, - conf.AutoDelete, - conf.Exclusive, - conf.NoWait, + ); err != nil { + return err + } + queue, err := q.channel.QueueDeclare( + queueName, + true, + false, + false, + false, args, ) - return err -} - -func (q *Admin) Bind(queueName string, routekey string, exchange string, notWait bool, args amqp.Table) error { + if err != nil { + return err + } return q.channel.QueueBind( - queueName, + queue.Name, routekey, exchange, - notWait, + false, args, ) } diff --git a/client/rabbitmq/conf.go b/client/rabbitmq/conf.go index 811e9b9..76b5f77 100644 --- a/client/rabbitmq/conf.go +++ b/client/rabbitmq/conf.go @@ -2,9 +2,17 @@ package rabbitmq import ( "fmt" + "github.com/streadway/amqp" "net/url" ) func (c *RabbitConf) URL() string { return fmt.Sprintf("amqp://%s:%s@%s:%d%s", c.Username, url.QueryEscape(c.Password), c.Host, c.Port, c.VHost) } + +var ( + ExchangeDirect = amqp.ExchangeDirect + ExchangeFanout = amqp.ExchangeFanout + ExchangeTopic = amqp.ExchangeTopic + ExchangeHeaders = amqp.ExchangeHeaders +) diff --git a/client/rocketmq/consumer.go b/client/rocketmq/consumer.go index 09f9dbc..0006991 100644 --- a/client/rocketmq/consumer.go +++ b/client/rocketmq/consumer.go @@ -141,9 +141,13 @@ func (c *rocketQueue) Stop(context.Context) error { return nil } -func (c *rocketQueue) consumGroupTopic(topic, expression string) { +func (c *rocketQueue) consumGroupTopic(topic, expression string) error { selector := consumer.MessageSelector{Type: consumer.TAG, Expression: expression} - c.sub.Subscribe(topic, selector, c.handleMsg) + err := c.sub.Subscribe(topic, selector, c.handleMsg) + if err != nil { + return err + } + return nil } func (c *rocketQueue) handleMsg(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { diff --git a/log/global.go b/log/global.go index 1552833..01d0943 100644 --- a/log/global.go +++ b/log/global.go @@ -44,11 +44,6 @@ func GetCore() zapcore.Core { return global.GetLogger().logger.Core() } -// Log Print log by level and keyvals. -//func Log(level Level, keyvals ...interface{}) { -// global.slogger.l(level, keyvals...) -//} - func With(kv ...interface{}) *ZapLogger { core := global.ZapLogger.logger.Core() core = WithCore(core, kv...)