Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Transfer with credit>1 will cause dead lock #117

Closed
yockii opened this issue Dec 13, 2021 · 13 comments · Fixed by #210
Closed

Message Transfer with credit>1 will cause dead lock #117

yockii opened this issue Dec 13, 2021 · 13 comments · Fixed by #210
Labels
blocking-release Blocks release bug This issue requires a change to an existing behavior in the product in order to be resolved. customer-reported Issues that are reported by GitHub users external to the Azure organization.

Comments

@yockii
Copy link

yockii commented Dec 13, 2021

i init receiver with session.NewReceiver using amqp.LinkCredit(10), and send message to another queue when received.
the send method will check if sender exist, if not, new a sender with session.NewSender......
that was bad, I cant get the sender for a long while and the message cannot be consumed....

so now, i have to make receiver using option amqp.LinkCredit(1) to resolve

@jhendrixMSFT
Copy link
Member

jhendrixMSFT commented Dec 13, 2021

Is the call to session.NewSender() hanging? Also, has the received message been settled yet?

@jhendrixMSFT
Copy link
Member

I tried to create a repro based on the provided information but couldn't. Could you please provide more detail, and/or a code sample?

@yockii
Copy link
Author

yockii commented Dec 14, 2021

thank you for reply.
Yes, the session.NewSender is hanging.
to reproduce the stage, there should have at lease 10 messages overstock.

the Pseudo code should like this:
`
var senders map[string]*amqp.Sender
// ......
msg, err := receiver.Receive(ctx)
if err != nil {
return err
}

// ....
sender, ok := senders[queue]
if !ok {
sender, err = session.NewSender(opts...)
if err != nil {
return err
}
senders[queue] = sender
}

_ = sender.Send(ctx, msg)

// ....
receiver.AcceptMessage(ctx, msg)
`

@jhendrixMSFT
Copy link
Member

Thanks for the info. I'm still unable to repro this using v0.17.0. In your pseudo-code, are all the operations running on the same goroutine? What are the options you're using to create the sender and receiver? Also, what version are you using?

@yockii
Copy link
Author

yockii commented Dec 14, 2021

Thanks for the info. I'm still unable to repro this using v0.17.0. In your pseudo-code, are all the operations running on the same goroutine? What are the options you're using to create the sender and receiver? Also, what version are you using?

I used v0.13, and I updated to v0.17.0 after this bug, also rewrite some code (some api changed), but the bug still there.
and yes, all the operations running on the same goroutine.
I use these code to create sender and receiver:
session.NewReceiver( amqp.LinkSourceAddress(queue), amqp.LinkCredit(1), )
`var opts []amqp.LinkOption

opts = append(opts, amqp.LinkTargetAddress(queue))

senders[queue], err = mq.session.NewSender(opts...)`

@jhendrixMSFT
Copy link
Member

Here's my repro code. Does any of this look similar to what you're doing? Also, what peer are you connecting to, is it Azure service bus, or something else?

func main() {
	// prime with messages
	client, err := amqp.Dial("amqp://localhost:25672")
	if err != nil {
		panic(err)
	}
	session, err := client.NewSession()
	if err != nil {
		panic(err)
	}
	sender, err := session.NewSender(
		amqp.LinkTargetAddress("helloworld"))
	if err != nil {
		panic(err)
	}
	for i := 0; i < 100; i++ {
		msg := amqp.NewMessage([]byte(fmt.Sprintf("test %d", i)))
		if err = sender.Send(context.Background(), msg); err != nil {
			panic(err)
		}
	}

	// receive one message
	recv, err := session.NewReceiver(
		amqp.LinkSourceAddress("helloworld"),
		amqp.LinkCredit(1))
	if err != nil {
		panic(err)
	}
	msg, err := recv.Receive(context.Background())
	if err != nil {
		panic(err)
	}

	// try to create new sender
	_, err = session.NewSender(
		amqp.LinkTargetAddress("helloworld"))
	if err != nil {
		panic(err)
	}
	fmt.Println(string(msg.GetData()))
	recv.AcceptMessage(context.Background(), msg)
}

@yockii
Copy link
Author

yockii commented Dec 14, 2021

almost the same except the begining, the message producer is another program, so i prefer to have a different session to first create receiver then sender.

and I use activeMQ5 locally.
the project is just in test stage

@jhendrixMSFT
Copy link
Member

I installed ActiveMQ 5.16.3 but still can't repro. I didn't change any config options for ActiveMQ though. Do you use a different configuration?

@yockii
Copy link
Author

yockii commented Dec 14, 2021

Please try this, it will only print 2 message on console

package main

import (
	"context"
	"fmt"

	"github.com/Azure/go-amqp"
)

func main() {
	sendInitMsg()
	lockCode()
}

func lockCode() {
	client, err := amqp.Dial("amqp://localhost:5672", amqp.ConnSASLAnonymous())
	if err != nil {
		panic(err)
	}
	session, err := client.NewSession()
	if err != nil {
		panic(err)
	}
	var sender *amqp.Sender
	for {
		// receive one message
		recv, err := session.NewReceiver(
			amqp.LinkSourceAddress("helloworld"),
			amqp.LinkCredit(10))
		if err != nil {
			panic(err)
		}
		msg, err := recv.Receive(context.Background())
		if err != nil {
			panic(err)
		}

		// try to create new sender
		if sender == nil {
			sender, err = session.NewSender(
				amqp.LinkTargetAddress("helloworld222"))
			if err != nil {
				panic(err)
			}
		}
		fmt.Println(string(msg.GetData()))
		sender.Send(context.Background(), msg)
		recv.AcceptMessage(context.Background(), msg)
	}
}

func sendInitMsg() {
	client, err := amqp.Dial("amqp://localhost:5672", amqp.ConnSASLAnonymous())
	if err != nil {
		panic(err)
	}
	session, err := client.NewSession()
	if err != nil {
		panic(err)
	}
	sender, err := session.NewSender(
		amqp.LinkTargetAddress("helloworld"))
	if err != nil {
		panic(err)
	}
	for i := 0; i < 100; i++ {
		msg := amqp.NewMessage([]byte(fmt.Sprintf("test %d", i)))
		if err = sender.Send(context.Background(), msg); err != nil {
			panic(err)
		}
	}
}

@jhendrixMSFT
Copy link
Member

Thanks this repros for me. My repro wasn't attempting to send a message on the new sender. I'm investigating.

@jhendrixMSFT
Copy link
Member

The issue is with link credit management. In this case, no receiver settlement mode has been specified, so we don't pause receiving transfer frames when the link credit is exhausted.

One thing that stands out, if you require to explicitly settle your received messages (I assume you do since you call recv.AcceptMessage(context.Background(), msg), you need to create your receiver in mode second by including option amqp.LinkReceiverSettle(amqp.ModeSecond). With this I can confirm the bug doesn't repro.

@yockii
Copy link
Author

yockii commented Dec 22, 2021

sorry for long delay, when I tried

recv, err := session.NewReceiver(
		amqp.LinkSourceAddress("helloworld"),
		amqp.LinkCredit(10),
		//amqp.LinkReceiverSettle(amqp.ModeFirst),
		amqp.LinkReceiverSettle(amqp.ModeSecond),
	)
if err != nil {
		panic(err)
	}

some error got:
panic: amqp: receiver settlement mode "second" requested, received "first" from server

@RickWinter RickWinter added customer-reported Issues that are reported by GitHub users external to the Azure organization. bug This issue requires a change to an existing behavior in the product in order to be resolved. labels May 2, 2022
@jhendrixMSFT jhendrixMSFT added the blocking-release Blocks release label Jun 24, 2022
@jhendrixMSFT
Copy link
Member

Fixing in pending release v0.18.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
blocking-release Blocks release bug This issue requires a change to an existing behavior in the product in order to be resolved. customer-reported Issues that are reported by GitHub users external to the Azure organization.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants