Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Best way to automatically reconnect? #179

Open
kofalt opened this issue Jun 3, 2019 · 28 comments
Open

[Question] Best way to automatically reconnect? #179

kofalt opened this issue Jun 3, 2019 · 28 comments
Labels

Comments

@kofalt
Copy link

kofalt commented Jun 3, 2019

I was wondering if this pattern matches your usage for a WAMP client that should try to always maintain a healthy connection. Do you do something differently? If so, would it make sense to stick something like the below in the wiki?

The rough approach I use:

import (
	// ...

	"github.com/gammazero/nexus/client"
	"github.com/gammazero/nexus/wamp"
)

var Address string
var RetryDuration time.Duration

var Session *client.Client

// Start a WAMP connection and keep it connected.
func start() {
	Session = resilientConnect()
	go c.watchdog()
}

// Attempt a WAMP connection until it succeeds
func resilientConnect() *client.Client {
	var wampClient *client.Client
	var err error

	for wampClient == nil || err != nil {
		wampClient, err = client.ConnectNet(Address, client.ClientConfig{
			// ...
		})

		if err != nil {
			// Log connection failure...

			time.Sleep(RetryDuration)
		}
	}

	// Log connection success ...

	return wampClient
}

// Wait for the WAMP connection to fail, then reconnect.
func watchdog() {
	for {
		// Wait for a connection failure
		_ = <-Session.Done()

		// Log broken connection... 
		
		Session = resilientConnect()
	}
}
@martin31821
Copy link
Contributor

In our case, we just exit when we detect a connection loss and rely on the container runtime to restart our app.

@kofalt
Copy link
Author

kofalt commented Jun 3, 2019

Hah - makes sense :)

I ended up modifying this logic using sync/atomic.Value, such that a watchdog-like coroutine can kick off a new connection without waiting for all uses of Session.Call or etc to exit. Previously I used a sync.RWMutex, but Value seems better suited by not blocking the reconnect.

I've been debating whether to ask for a Client.Reconnect method. On one hand, Reconnect might make concurrent use easier when dealing with disconnects, on the other, requiring a new ConnectNet call enforces the idea that any session state and teardown must be handled robustly.

I am not sure how to use github wikis, but since this ticket is searchable, we can probably just close it.

@gammazero
Copy link
Owner

@kofalt In a commercial product that uses nexus, there is code which is very similar to what you have above. So, this is a useful pattern and at least worth documenting. I am trying to decide if adding something like this to the client library is correct, or if this should only be part of the application implementing a client.

@dcarbone
Copy link
Contributor

dcarbone commented Jun 4, 2019

For my own product I ended up wrapping the client entirely within my own, with an internal structure that looks a bit like this:

type Client struct {
	mu sync.RWMutex

	done   chan struct{}
	closed bool

	logger *log.Logger
	debug  bool

	realmName cw2.RealmName // realm to join

	// these track topic subscriptions and procedure registrations that we wish to have applied each time the underlying
	// client is created, either due to initialization or socket close -> open cycle
	resilientSubscriptions   map[string]TopicSubscription // topics to automatically subscribe to
	resilientSubscriptionsMu sync.Mutex
	resilientRegistrations   map[string]ProcedureRegistration // procedures to automatically register
	resilientRegistrationsMu sync.Mutex

	// underlying client configuration
	tlsConfig         *tls.Config       // optional tls configuration to use
	connectionType    ConnectionType    // what type of client to maintain
	serializationType SerializationType // what type of serialization to use
	bootstrapFailMode BootstrapFailMode // how we should handle bootstrap errors
	outboundQueueSize int               // size of outbound ws frame buffer
	nexusClient       *nclient.Client   // nexus client
	nexusClientClosed chan struct{}     // if there is a value here, the nexus client was closed for some external reason.
}

This provided me with a few benefits:

  1. I can utilize a Raw or Web socket connection with the same overarching Client type
  2. Upon Nexus client connection, this Client will automatically attempt to join a specific Realm
  3. I maintain a list of topics and procedures that, on Nexus Client connect, are automatically subscribed to / registered.
  4. The implementors of my Client do not need to maintain their own Client state, unless they explicitly Close() it, at which time it will become defunct

My implementation has been running very successfully for a few months now, and this topic discussion has me of a mind to possibly contribute to a ResilientClient implementation.

If there is interest I can start on that this weekend.

@kofalt
Copy link
Author

kofalt commented Jun 4, 2019

That looks great @dcarbone. Have a gander at atomic.Value when you get a chance, I swapped to that over sync.RWMutex when replacing the Client and like that a lot better. I would review a ResilientClient patch with interest.

@kofalt
Copy link
Author

kofalt commented Jun 4, 2019

For reference, here's roughly what I ended up with after some improvements:

// Type changed
Session atomic.Value

// start() same as before

// resilientConnect() same as before, but calls Session.Store

func watchdog() {

	// same as before, except now calls Load:

	_ = <-Session.Load().(*client.Client).Done()

	// ..
}

I then added a new function:

// GetClient blocks until an allegedly-connected client session is available.
func GetClient() *client.Client {
	for {
		maybeSession := Session.Load()

		if maybeSession != nil {
			session := maybeSession.(*client.Client)
			open := true

			// Check session status
			select {
			case <-session.Done():
				open = false
			default:
			}

			if open {
				return session
			}
		}

		// Wait for retry...
	}
}

Then, any WAMP interaction first calls GetClient and receives a probably-valid session.

This has the advantage of allowing you to renegotiate new connections while the old one is still failing; functions involving Session.Call/etc can shut down concurrently with a new connection being made.

I would like to pass a context to GetClient so you could give up after some period.

@martin31821
Copy link
Contributor

Maybe something like this could also be integrated into our high-level 'service' library: service...

@dcarbone
Copy link
Contributor

dcarbone commented Jun 5, 2019

@kofalt @martin31821: alright, i will have something by Saturday. Looks like its going to rain here all weekend, so perfect timing :)

@gammazero
Copy link
Owner

I have time this weekend to collaborate on design discussions, code reviews, and help with development here, so count me in!

Previously I mentioned a resilient client wrapper that wraps the nexus client, so I will share a stripped-down likeness of the code to give some ideas of what was done for a production system. The wrapper maintains a connected nexus client. If the client is disconnected, then the wrapper creates a new connected client. The wrapper also presents a channel-based interface. For example, a Publish() method returns a channel that the caller can use for publishing messages, similar for others like Subscribe(), Register(), etc. The channel is serviced by the same goroutine that handles reconnection, which allows the wrapper to avoid race conditions between connecting a new client and handling communications.

Here is some skeleton code to provide ideas:

type ResilientClient struct {
    realm     string
    routerURI url.URL
    logger    *log.Logger
    session   *client.Client
    acts      chan func()
    once      sync.Once
}

func (c *Client) Run(ctx context.Context) {
    c.once.Do(func() {
        go c.run(ctx)
    })
}

func (c *ResilientClient) run(ctx context.Context) {
    var (
        newClientTimer *time.Timer
        err            error
    )
    defer func() {
        if newClientTimer != nil {
            newClientTimer.Stop()
        }
        if c.session != nil {
            c.session.Close()
        }
    }()

    for {
        var (
            currClientDied   <-chan struct{}
            newClientTimeout <-chan time.Time
        )

        // Create a new client if first time or if old one died.                
        if c.session == nil {
            cfg := &client.Config{
                Realm:  c.realm,
                Logger: c.logger,
                // Config other session details                                 
                //HelloDetails: ...                                             
            }
            c.session, err = client.ConnectNet(c.routerUri, cfg)
            if err != nil {
                log.Println(err)
            } else {
                // Setup publishers, subscribers, etc.                          
                //...                                                           
            }
        }

        if c.session != nil {
            // Current client: monitor for premature death.                     
            currClientDied = c.session.Done()
        } else {
            // No client: set timer to create one after a short delay.          
            if newClientTimer == nil {
                newClientTimer = time.NewTimer(time.Second)
            } else {
                newClientTimer.Reset(time.Second)
            }
            newClientTimeout = newClientTimer.C
        }

    Loop:
        for {
            select {
            case act := <-c.acts:
                // Invoke an action delegate: Publish, Subscribe, etc.          
                act()
            case <-currClientDied:
                // The current client has died, force creation of a new one     
                c.session.Close()
                c.session = nil
                break Loop
            case <-newClientTimeout:
                // It is time to retry new client creation                      
                break Loop
            case <-ctx.Done():
                // This goroutine is done                                       
                return
            }
        }
    }
}

// Publish returns a channel for the caller to use for publishing events.       
func (c *Client) Publish() chan *Message {
    messages := make(chan *Message)
    go func() {
        for message := range messages {
            c.acts <- func() {
                c.publish(message)
            }
        }
    }()
    return messages
}

func (c *Client) publish(msg *Message) {
    if c.client == nil {
        return
    }
    err := c.client.Publish(msg.Topic, msg.Options, msg.Args, msg.KwArgs)
    if err != nil {
        log.Printf("Publish failure (router=%s) (realm=%s) (topic=%s): %s",
            c.routerUri, c.realm, msg.Topic, err)
        c.close()
    }
}

@dcarbone
Copy link
Contributor

dcarbone commented Jun 7, 2019

I have a branch in works: https://github.com/dcarbone/nexus/tree/feature/resilient-client

Let me know what you think. One big thing for me is to not block while the client is down, implementors should be allowed to determine how they want to handle things, i believe.

@dcarbone
Copy link
Contributor

dcarbone commented Jun 7, 2019

https://github.com/dcarbone/nexus/blob/feature/resilient-client/client/resilient_client.go contains the code.

There are a number of things i would like to clean up and address, namely using determinable errors for Closed and Disconnected differentiation, and some other misc cleanup, but something very similar to this is what i wrote for use in my own projects and it works quite well.

Input welcome @gammazero @kofalt @martin31821

@gammazero
Copy link
Owner

gammazero commented Jun 7, 2019

@dcarbone Yes, many similarities. Some implementation diffs like RWLock vs goroutine for synchronization, but basically the same idea.

I agree that having determinable errors is beneficial. Similar to a Context, the nexus client already has a Done() method. I will add an Err() method to describe why the done signal is set, and that will return a specific error value, again like Context (https://golang.org/src/context/context.go#L155)
Similar should be done for cancelable calls.

Tonight, I will put together a list of design goals, which seem shared by our client implementations, that we can discuss. We should also consider what kind of overall interface we want, like calling methods, or writing/reading messages over channels.

@martin31821
Copy link
Contributor

What's your idea or plan in regards to automatically reestablishing previously used registrations and subscriptions? Should those be reestablished?
And also what's the case with dynamically established subscriptions/registrations?

@dcarbone
Copy link
Contributor

dcarbone commented Jun 8, 2019

@martin31821: i retain a separate list of what I've coined "Resilient" procedures and topics. Upon underlying client connection, i call this method, which will attempt to re-subscribe / re-register as necessary. One thing that needs some work still is how to handle errors during these calls when they are being executed asynchronously post-initialization.

Calling Unsubscribe and Unregister will remove any resilient subs / regs from their respective lists.

@martin31821
Copy link
Contributor

@dcarbone that seems like a fairly good approach to me. I'd also like to see a callback to clear local state when the connection is lost. How are publications handled when the client is in the reconnecting state? And how are errors handled when the service has another auth role after it reconnects?

@dcarbone
Copy link
Contributor

dcarbone commented Jun 8, 2019

@martin31821 at the moment anything done during reconnect is blocked by virtue of the reconnect happening within a Lock(), which I could see an argument for modifying. Any actions taken when the client is nil will result in an error return.

A callback is a good idea, I will work on one.

@gammazero
Copy link
Owner

@dcarbone Regarding Any actions taken when the client is nil will result in an error return. I am thinking that is preferable to block to caller until the reconnection happens. Otherwise, this forces the caller to implement resiliency in the form or retries -- and defeats the purpose of a resilient client.

@dcarbone
Copy link
Contributor

dcarbone commented Jun 8, 2019

@gammazero I disagree. WAMP messages should be ephemeral, and the blocking the entire process without at least providing the implementor some way to cancel the attempt after a period of time is not a good model. There are a litany of reasons the connection would be unable to be established, network issues, external auth service down, etc., and there is no guarantee that a connection will be re-established while the message still has relevancy.

The primary benefit of this client is reducing the amount of boilerplate each individual implementor should do when utilizing WAMP In their product.

@gammazero
Copy link
Owner

gammazero commented Jun 8, 2019

Design Ideas

The overall goal is to provide a client that recovers from temporary network outages, as invisibly as possible. A secondary goal is to reduce the amount of boiler plate code needed to implement a client.

  1. Client will automatically reconnect to router/realm following a disconnect.
    • Will continue to retry at some interval until reconnected or termination.
  2. Upon reconnect, client will re-subscribe to all subscriptions in place before disconnection.
  3. Upon reconnect, client will re-register for all RPC in place before disconnection.
  4. While disconnected, actions taken are blocking until reconnect, or until canceled.
  5. Blocked calls may be canceled (using context CancelFunc), and will return an differentiable error (ErrCanceled).
  6. A resilient client may be closed at anytime. While connected, behavior is same as normal client. When disconnected, any blocked calls will return ErrClosed and client will stop trying to reconnect.
  7. Once a client is closed, it may not be reconnected. A new client must be created.
  8. Ability to recover event history automatically (requires router support).

@gammazero
Copy link
Owner

@dcarbone I was thinking that blocking would be handled by having the caller provide a context to be able to cancel or timeout a blocked call.

@dcarbone
Copy link
Contributor

dcarbone commented Jun 8, 2019

@gammazero I could see that as a course of action, however I think it may be difficult to pass the context all the way down to the socket transmission layer in an eloquent manner. Would have to either turn each message into a context.Context or modify quite a bit of the codebase.

I looked into this quite some time ago and came to the conclusion that it was more work than I felt made sense at the time. I was also less familiar with GO then, so its possible I am misremembering the amount of work that would be involved.

@gammazero
Copy link
Owner

It may be worth another look (I don't have an answer either), since a context "carries deadlines, cancelation signals, and other request-scoped values across API boundaries". If it is the right thing to do then we can always do a major API revision.

@kofalt
Copy link
Author

kofalt commented Jun 8, 2019

Catching up on this thread.


1: The "Design Ideas" comment LGTM; I would probably want to add something about custom re-subscription behavior.

Consider the use case of watching an append-only log, like a chatroom. A chat client probably wants to subscribe to chat events, wait for the subscription to be acknowledged - is there a way to do that in WAMP? - then make an RPC fetching recent chat history. The client then de-duplicates any chat events they get that overlap with the chat history, and displays the chat. Otherwise, you'd have a race condition where you could miss some chat lines.

For a resilient WAMP client, you would want to make the same chat-history RPC to see if you missed any chatter while you were disconnected. Maybe callbacks are enough to handle this scenario?

type ResilientClient struct {

    // ...

   // Fires when the session fails and resilient reconnection is about to begin.
   // Return false to cancel the reconnet.
   OnDisconnect func() bool

   // Fires when the session is healthy again.
   OnReconnect func(client ResilientClient)
}

I made the func signatures up - maybe they'd need to take some other / useful parameters. And they probably wouldn't be exported on the struct itself but instead part of ClientConfig or something. You get the idea.

If something like this pattern is used, you wouldn't need to re-subscribe based on a remembered topic list. That would be left to the callback to call client.Subscribe() - or not, as business logic dictates.

Maybe you don't want to re-subscribe to a topic if your connection has failed five times in the last hour, or some other graceful-degradation / circuit-breaker behavior, etc...


2: Seconded on the context approach; I'm using contexts all over the place to manage a highly-concurrent server task and it is exceedingly useful to coordinate semantic chunks of logic that do not acknowledge or understand each other.

@dcarbone
Copy link
Contributor

dcarbone commented Jun 9, 2019 via email

@kofalt
Copy link
Author

kofalt commented Jun 9, 2019

A literal shower thought after vegetating on this more - my proposed OnReconnect handler would be more appropriately used as OnConnect instead, explicitly firing on the first and every subsequent connection.

Then, it's trivial to get @dcarbone's resilientSubscriptions behavior (just pass a closure with some Subscribe() calls), while still meeting the use case in my previous comment.

@gammazero
Copy link
Owner

@kofalt Regarding your comment on "chat history". WAMP does have an advance profile feature called Event History (https://wamp-proto.org/_static/gen/wamp_latest.html#event-history) that would allow the recover of missed events. This requires the router to support it though. If there is router support, the the resilient client should be able to handle automatic recovery of missed events.

As far as the OnConnect callback, that can certainly be provided so that the application can do something if a reconnect happens. However, I do not think that is the place to handle re-subscribe and re-register. I think the resilient client should always do that so that on reconnect. If the application wants to end a subscription or registration, then it should explicitly unsubscribe or unregister. In other words, the reconnection should be invisible unless disconnect caused the application to timeout/cancel some blocked action.

@gammazero
Copy link
Owner

@dcarbone Where to begin... Interface design. That is the most critical part of all of this. If the interfaces support all of the use cases we agree on, in the safest and simplest way, then that will determine how all the rest works.

I say we edit wiki pages to propose resilient client interfaces, and then we can pick what we like best from them all and come up with a final, or we can submit links to code in our repos, gists, etc. I say try to come up with a fairly complete set of function signatures, but do not provide implementation beyond them.

@martin31821
Copy link
Contributor

@gammazero resubscribe and re-register should be handled by the client itself. The onConnect callback should only be used to change internal state of your application.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants