Skip to content

Commit

Permalink
go 1.20
Browse files Browse the repository at this point in the history
  • Loading branch information
emberfarkas committed Jan 23, 2024
1 parent 463136b commit ed105db
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
2 changes: 1 addition & 1 deletion client/rabbitmq/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Admin struct {
}

func MustNewAdmin(c *RabbitConf) *Admin {
conn, err := amqp.Dial(c.Address)
conn, err := amqp.Dial(c.URL())
if err != nil {
log.Fatalf("failed to connect rabbitmq, error: %v", err)
}
Expand Down
11 changes: 8 additions & 3 deletions client/rabbitmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ import (
"time"

"github.com/go-bamboo/pkg/log"
"github.com/go-bamboo/pkg/queue"
"github.com/go-bamboo/pkg/rescue"
"github.com/go-kratos/kratos/v2/errors"
"github.com/streadway/amqp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

type Sender interface {
Name() string
Send(ctx context.Context, header map[string]interface{}, exchange string, routeKey string, msg []byte) error
Close() error
}

type (
RabbitMqSender struct {
c *ProducerConf
Expand All @@ -36,7 +41,7 @@ type (
}
)

func MustNewSender(c *ProducerConf) queue.Sender {
func MustNewSender(c *ProducerConf) Sender {
ctx, cf := context.WithCancel(context.TODO())
sender := &RabbitMqSender{
c: c,
Expand Down Expand Up @@ -112,7 +117,7 @@ func (q *RabbitMqSender) Close() error {
}

func (q *RabbitMqSender) connect() error {
conn, err := amqp.Dial(q.c.Rabbit.Address)
conn, err := amqp.Dial(q.c.Rabbit.URL())
if err != nil {
return err
}
Expand Down
20 changes: 11 additions & 9 deletions client/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ func MustNewQueue(c *Conf, handler ConsumeHandler) queue.MessageQueue {

func NewQueue(c *Conf, handler ConsumeHandler) (queue.MessageQueue, error) {
q := rocketQueues{}
cc, err := newKafkaQueue(c, handler)
if err != nil {
log.Error(err)
return nil, err
for i := 0; i < int(c.Conns); i++ {
cc, err := newKafkaQueue(c, handler)
if err != nil {
log.Error(err)
return nil, err
}
q.queues = append(q.queues, cc)
}
q.queues = append(q.queues, cc)
return &q, nil
}

Expand All @@ -68,15 +70,15 @@ func (q rocketQueues) Name() string {
}

func (q rocketQueues) Start(ctx context.Context) error {
for _, queue := range q.queues {
queue.Start(ctx)
for _, qq := range q.queues {
qq.Start(ctx)
}
return nil
}

func (q rocketQueues) Stop(ctx context.Context) error {
for _, queue := range q.queues {
queue.Stop(ctx)
for _, qq := range q.queues {
qq.Stop(ctx)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/go-bamboo/pkg

go 1.19
go 1.20

require (
cloud.google.com/go/storage v1.30.1
Expand Down

0 comments on commit ed105db

Please sign in to comment.