Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: automatically recovering RabbitMQ connections #2958

Closed
kmlebedev opened this issue Feb 10, 2021 · 8 comments · Fixed by #2964
Closed

pubsub: automatically recovering RabbitMQ connections #2958

kmlebedev opened this issue Feb 10, 2021 · 8 comments · Fixed by #2964

Comments

@kmlebedev
Copy link

kmlebedev commented Feb 10, 2021

Describe the solution you'd like

Now the connection is established once during the initialization of the defaultDialer and there is no way to restore it without restarting the application

Describe alternatives you've considered

Automatically recovering RabbitMQ connections

Additional context

https://medium.com/@dhanushgopinath/automatically-recovering-rabbitmq-connections-in-go-applications-7795a605ca59

conn, err := amqp.Dial(serverURL)

@kmlebedev
Copy link
Author

I tried to reconnect, but it is not clear how to reset the saved error
"Non-retryable error from ReceiveBatch -> permanent error."
https://github.com/google/go-cloud/blob/master/pubsub/pubsub.go#L560

@vangent
Copy link
Contributor

vangent commented Feb 10, 2021

Just restart the application on any permanent error and it will reconnect.

@kmlebedev
Copy link
Author

Just restart the application on any permanent error and it will reconnect.

I can afford to reload the message receiving application
But an application that sends messages is not acceptable

Need to invent something
since the reconnection cannot be obtained on the application side

@vangent
Copy link
Contributor

vangent commented Feb 10, 2021

We do already recreate the amqp channel as needed using NotifyClose (search for closec in rabbit.go).

You can create your own URLOpener if you don't want to use the default one:

type URLOpener struct {

Or, use the explicit constructor where you can provide the connection yourself:

func OpenTopic(conn *amqp.Connection, name string, opts *TopicOptions) *pubsub.Topic {

Does that help?

@vangent
Copy link
Contributor

vangent commented Feb 10, 2021

I supposed defaultConn could save the URL it used, and if the connection is IsClosed reconnect?

https://pkg.go.dev/github.com/streadway/amqp#Connection.IsClosed

@kmlebedev
Copy link
Author

I supposed defaultConn could save the URL it used, and if the connection is IsClosed reconnect?

https://pkg.go.dev/github.com/streadway/amqp#Connection.IsClosed

I had no problems in this part.
Difficulty correctly throwing a new connection into Topic structure

type topic struct {

after initialization

@vangent
Copy link
Contributor

vangent commented Feb 10, 2021

Just create a new Topic using OpenTopic (passing a new Connection) or <your URLOpener>.OpenTopicURL (after updating the Connection field on your URLOpener).

It won't work if you are using the default URLOpener, as it only creates the connection once.

@kmlebedev
Copy link
Author

Just create a new Topic using OpenTopic (passing a new Connection) or <your URLOpener>.OpenTopicURL (after updating the Connection field on your URLOpener).

It won't work if you are using the default URLOpener, as it only creates the connection once.

Thanks, I implemented everything on the application side, you need to do what kind of PR ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants