Skip to content

Commit

Permalink
Merge pull request #81 from adriansmares/feature/upgrade-amqp-lib
Browse files Browse the repository at this point in the history
Upgrade go-amqp to v1.0.1
  • Loading branch information
amenzhinsky committed Jul 10, 2023
2 parents 41367c7 + f568b2f commit d4c494d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/iothub-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func watchEvents(ctx context.Context, c *iotservice.Client, args []string) error
}

func watchEventHubEvents(ctx context.Context, cs, group string) error {
c, err := eventhub.DialConnectionString(cs)
c, err := eventhub.DialConnectionStringContext(ctx, cs)
if err != nil {
return err
}
Expand Down
32 changes: 23 additions & 9 deletions eventhub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,32 +78,46 @@ func WithConnOption(key string, value any) Option {
}
}

// Dial connects to the named EventHub and returns a client instance.
func Dial(host, name string, opts ...Option) (*Client, error) {
// DialContext connects to the named EventHub and returns a client instance
// using the provided context.
func DialContext(ctx context.Context, host, name string, opts ...Option) (*Client, error) {
c := &Client{name: name}
for _, opt := range opts {
opt(c)
}

var err error
c.conn, err = amqp.Dial("amqps://"+host, &c.opts)
c.conn, err = amqp.Dial(ctx, "amqps://"+host, &c.opts)
if err != nil {
return nil, err
}
return c, nil
}

// DialConnectionString dials an EventHub instance using the given connection string.
func DialConnectionString(cs string, opts ...Option) (*Client, error) {
// Dial connects to the named EventHub and returns a client instance.
// DEPRECATED: Use DialContext.
func Dial(host, name string, opts ...Option) (*Client, error) {
return DialContext(context.Background(), host, name, opts...)
}

// DialConnectionString dials an EventHub instance using the given connection string
// and the provided context.
func DialConnectionStringContext(ctx context.Context, cs string, opts ...Option) (*Client, error) {
creds, err := ParseConnectionString(cs)
if err != nil {
return nil, err
}
return Dial(creds.Endpoint, creds.EntityPath, append([]Option{
return DialContext(ctx, creds.Endpoint, creds.EntityPath, append([]Option{
WithSASLPlain(creds.SharedAccessKeyName, creds.SharedAccessKey),
}, opts...)...)
}

// DialConnectionString dials an EventHub instance using the given connection string.
// DEPRECATED: Use DialConnectionStringContext.
func DialConnectionString(cs string, opts ...Option) (*Client, error) {
return DialConnectionStringContext(context.Background(), cs, opts...)
}

// Client is an EventHub client.
type Client struct {
name string
Expand Down Expand Up @@ -190,7 +204,7 @@ func (c *Client) Subscribe(
go func(recv *amqp.Receiver) {
defer recv.Close(context.Background())
for {
msg, err := recv.Receive(ctx)
msg, err := recv.Receive(ctx, &amqp.ReceiveOptions{})
if err != nil {
select {
case errc <- err:
Expand Down Expand Up @@ -255,11 +269,11 @@ func (c *Client) getPartitionIDs(ctx context.Context, sess *amqp.Session) ([]str
"name": c.name,
"type": "com.microsoft:eventhub",
},
}); err != nil {
}, &amqp.SendOptions{}); err != nil {
return nil, err
}

msg, err := recv.Receive(ctx)
msg, err := recv.Receive(ctx, &amqp.ReceiveOptions{})
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ module github.com/amenzhinsky/iothub
go 1.18

require (
github.com/Azure/go-amqp v0.18.1
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/Azure/go-amqp v1.0.1
github.com/eclipse/paho.mqtt.golang v1.4.2
)

require (
github.com/gorilla/websocket v1.4.2 // indirect
golang.org/x/net v0.7.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
)
17 changes: 10 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc=
github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-amqp v1.0.1 h1:Jf8OQCKzRDMZ3pCiH4onM7yrhl5curkRSGkRLTyP35o=
github.com/Azure/go-amqp v1.0.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
14 changes: 7 additions & 7 deletions iotservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *Client) newSession(ctx context.Context) (*amqp.Session, error) {
if c.conn != nil {
return c.conn.NewSession(ctx, nil) // already connected
}
conn, err := amqp.Dial("amqps://"+c.sak.HostName, &amqp.ConnOptions{
conn, err := amqp.Dial(ctx, "amqps://"+c.sak.HostName, &amqp.ConnOptions{
TLSConfig: c.tls,
Properties: map[string]any{"com.microsoft:client-version": userAgent},
})
Expand Down Expand Up @@ -223,11 +223,11 @@ func (c *Client) putToken(
"type": "servicebus.windows.net:sastoken",
"name": c.sak.HostName,
},
}); err != nil {
}, &amqp.SendOptions{}); err != nil {
return err
}

msg, err := recv.Receive(ctx)
msg, err := recv.Receive(ctx, &amqp.ReceiveOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Client) connectToEventHub(ctx context.Context) (*eventhub.Client, error
tlsCfg := c.tls.Clone()
tlsCfg.ServerName = host

eh, err := eventhub.Dial(host, group,
eh, err := eventhub.DialContext(ctx, host, group,
eventhub.WithTLSConfig(tlsCfg),
eventhub.WithSASLPlain(c.sak.SharedAccessKeyName, c.sak.SharedAccessKey),
eventhub.WithConnOption("com.microsoft:client-version", userAgent),
Expand Down Expand Up @@ -420,7 +420,7 @@ func (c *Client) SendEvent(
if err != nil {
return err
}
return send.Send(ctx, toAMQPMessage(msg))
return send.Send(ctx, toAMQPMessage(msg), &amqp.SendOptions{})
}

// getSendLink caches sender link between calls to speed up sending events.
Expand Down Expand Up @@ -468,7 +468,7 @@ func (c *Client) SubscribeFeedback(ctx context.Context, fn FeedbackHandler) erro
defer recv.Close(context.Background())

for {
msg, err := recv.Receive(ctx)
msg, err := recv.Receive(ctx, &amqp.ReceiveOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func (c *Client) SubscribeFileNotifications(
defer recv.Close(context.Background())

for {
msg, err := recv.Receive(ctx)
msg, err := recv.Receive(ctx, &amqp.ReceiveOptions{})
if err != nil {
return err
}
Expand Down

0 comments on commit d4c494d

Please sign in to comment.