Skip to content

Commit

Permalink
feat: Allow reconnects without losing state/focus. #2252 (#2253)
Browse files Browse the repository at this point in the history
  • Loading branch information
mturoci committed Feb 7, 2024
1 parent ca27772 commit 26f3c73
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 34 deletions.
4 changes: 4 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Broker struct {
unicasts map[string]bool // "/client_id" => true
unicastsMux sync.RWMutex // mutex for tracking unicast routes
keepAppLive bool
clientsByID map[string]*Client
}

func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *Broker {
Expand All @@ -96,6 +97,7 @@ func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *B
make(map[string]bool),
sync.RWMutex{},
keepAppLive,
make(map[string]*Client),
}
}

Expand Down Expand Up @@ -263,6 +265,7 @@ func (b *Broker) addClient(route string, client *Client) {

b.unicastsMux.Lock()
b.unicasts["/"+client.id] = true
b.clientsByID[client.id] = client
b.unicastsMux.Unlock()

echo(Log{"t": "ui_add", "addr": client.addr, "route": route})
Expand Down Expand Up @@ -291,6 +294,7 @@ func (b *Broker) dropClient(client *Client) {

b.unicastsMux.Lock()
delete(b.unicasts, "/"+client.id)
delete(b.clientsByID, client.id)
b.unicastsMux.Unlock()

echo(Log{"t": "ui_drop", "addr": client.addr})
Expand Down
67 changes: 43 additions & 24 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,29 @@ type BootMsg struct {

// Client represent a websocket (UI) client.
type Client struct {
id string // unique id
auth *Auth // auth provider, might be nil
addr string // remote IP:port, used for logging only
session *Session // end-user session
broker *Broker // broker
conn *websocket.Conn // connection
routes []string // watched routes
data chan []byte // send data
editable bool // allow editing? // TODO move to user; tie to role
baseURL string // URL prefix of the Wave server
header *http.Header // forwarded headers from the WS connection
appPath string // path of the app this client is connected to, doesn't change throughout WS lifetime
pingInterval time.Duration
id string // unique id
auth *Auth // auth provider, might be nil
addr string // remote IP:port, used for logging only
session *Session // end-user session
broker *Broker // broker
conn *websocket.Conn // connection
routes []string // watched routes
data chan []byte // send data
editable bool // allow editing? // TODO move to user; tie to role
baseURL string // URL prefix of the Wave server
header *http.Header // forwarded headers from the WS connection
appPath string // path of the app this client is connected to, doesn't change throughout WS lifetime
pingInterval time.Duration
isReconnect bool
cancel context.CancelFunc
reconnectTimeout time.Duration
}

func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, baseURL string, header *http.Header, pingInterval time.Duration) *Client {
return &Client{uuid.New().String(), auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval}
// TODO: Refactor some of the params into a Config struct.
func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool,
baseURL string, header *http.Header, pingInterval time.Duration, isReconnect bool, reconnectTimeout time.Duration) *Client {
id := uuid.New().String()
return &Client{id, auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval, isReconnect, nil, reconnectTimeout}
}

func (c *Client) refreshToken() error {
Expand All @@ -90,15 +96,26 @@ func (c *Client) refreshToken() error {

func (c *Client) listen() {
defer func() {
app := c.broker.getApp(c.appPath)
if app != nil {
app.forward(c.id, c.session, disconnectMsg)
if err := app.disconnect(c.id); err != nil {
echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()})
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
go func(ctx context.Context) {
select {
// Send disconnect message only if client doesn't reconnect within the specified timeframe.
case <-time.After(c.reconnectTimeout):
app := c.broker.getApp(c.appPath)
if app != nil {
app.forward(c.id, c.session, disconnectMsg)
if err := app.disconnect(c.id); err != nil {
echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()})
}
}

c.broker.unsubscribe <- c
case <-ctx.Done():
return
}
}
}(ctx)

c.broker.unsubscribe <- c
c.conn.Close()
}()
// Time allowed to read the next pong message from the peer. Must be greater than ping interval.
Expand Down Expand Up @@ -157,8 +174,10 @@ func (c *Client) listen() {
c.broker.sendAll(c.broker.clients[app.route], clearStateMsg)
}
case watchMsgT:
c.subscribe(m.addr) // subscribe even if page is currently NA

if c.isReconnect {
continue
}
c.subscribe(m.addr) // subscribe even if page is currently NA
if app := c.broker.getApp(m.addr); app != nil { // do we have an app handling this route?
c.appPath = m.addr
switch app.mode {
Expand Down
7 changes: 7 additions & 0 deletions cmd/wave/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func main() {
panic(err)
}

// TODO: Handle this at the config parser level.
if authConf.SessionExpiry, err = time.ParseDuration(conf.SessionExpiry); err != nil {
panic(err)
}
Expand All @@ -189,6 +190,10 @@ func main() {
panic(err)
}

if serverConf.ReconnectTimeout, err = time.ParseDuration(conf.ReconnectTimeout); err != nil {
panic(err)
}

serverConf.WebDir, _ = filepath.Abs(conf.WebDir)
serverConf.DataDir, _ = filepath.Abs(conf.DataDir)
serverConf.Version = Version
Expand Down Expand Up @@ -227,6 +232,8 @@ func main() {
authConf.URLParameters = append(authConf.URLParameters, kv)
}
}

// TODO: Handle this at the config parser level.
if authConf.SessionExpiry, err = time.ParseDuration(conf.SessionExpiry); err != nil {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type ServerConf struct {
ForwardedHeaders map[string]bool
KeepAppLive bool
PingInterval time.Duration
ReconnectTimeout time.Duration
}

type AuthConf struct {
Expand Down Expand Up @@ -112,4 +113,5 @@ type Conf struct {
SkipLogin bool `cfg:"oidc-skip-login" env:"H2O_WAVE_OIDC_SKIP_LOGIN" cfgDefault:"false" cfgHelper:"do not display the login form during OIDC authorization"`
KeepAppLive bool `cfg:"keep-app-live" env:"H2O_WAVE_KEEP_APP_LIVE" cfgDefault:"false" cfgHelper:"do not unregister unresponsive apps"`
Conf string `cfg:"conf" env:"H2O_WAVE_CONF" cfgDefault:".env" cfgHelper:"path to configuration file"`
ReconnectTimeout string `cfg:"reconnect-timeout" env:"H2O_WAVE_RECONNECT_TIMEOUT" cfgDefault:"2s" cfgHelper:"Time to wait for reconnect before dropping the client"`
}
1 change: 1 addition & 0 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type OpsD struct {
E string `json:"e,omitempty"` // error
M *Meta `json:"m,omitempty"` // metadata
C int `json:"c,omitempty"` // clear UI state
I string `json:"i,omitempty"` // client id
}

// Meta represents metadata unrelated to commands
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func Run(conf ServerConf) {
handle("_auth/refresh", newRefreshHandler(auth, conf.Keychain))
}

handle("_s/", newSocketServer(broker, auth, conf.Editable, conf.BaseURL, conf.ForwardedHeaders, conf.PingInterval)) // XXX terminate sockets when logged out
handle("_s/", newSocketServer(broker, auth, conf.Editable, conf.BaseURL, conf.ForwardedHeaders, conf.PingInterval, conf.ReconnectTimeout))

fileDir := filepath.Join(conf.DataDir, "f")
handle("_f/", newFileServer(fileDir, conf.Keychain, auth, conf.BaseURL+"_f"))
Expand Down
30 changes: 27 additions & 3 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ type SocketServer struct {
baseURL string
forwardedHeaders map[string]bool
pingInterval time.Duration
reconnectTimeout time.Duration
}

func newSocketServer(broker *Broker, auth *Auth, editable bool, baseURL string, forwardedHeaders map[string]bool, pingInterval time.Duration) *SocketServer {
return &SocketServer{broker, auth, editable, baseURL, forwardedHeaders, pingInterval}
func newSocketServer(broker *Broker, auth *Auth, editable bool, baseURL string, forwardedHeaders map[string]bool, pingInterval, reconnectTimeout time.Duration) *SocketServer {
return &SocketServer{broker, auth, editable, baseURL, forwardedHeaders, pingInterval, reconnectTimeout}
}

func (s *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -70,8 +71,31 @@ func (s *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
}
clientID := r.URL.Query().Get("client-id")
client, ok := s.broker.clientsByID[clientID]
if ok {
client.conn = conn
client.isReconnect = true
if client.cancel != nil {
client.cancel()
}
if s.broker.debug {
echo(Log{"t": "socket_reconnect", "client_id": clientID, "addr": getRemoteAddr(r)})
}
} else {
client = newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval, false, s.reconnectTimeout)
}

if msg, err := json.Marshal(OpsD{I: client.id}); err == nil {
sw, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
sw.Write(msg)
sw.Close()
}

client := newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval)
go client.flush()
go client.listen()
}
Expand Down
31 changes: 26 additions & 5 deletions ui/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ interface OpsD {
e: B // can the user edit pages?
}
c?: U // clear UI state
i?: S // client id
}
interface OpD {
k?: S
Expand Down Expand Up @@ -938,14 +939,21 @@ export const
let
_socket: WebSocket | null = null,
_page: XPage | null = null,
_backoff = 1
_backoff = 1,
_reconnectFailures = 0,
_clientID = ''

const
slug = window.location.pathname,
reconnect = (address: S) => {
if (_clientID && !address.includes('?client-id')) {
address = `${address}?${new URLSearchParams({ 'client-id': _clientID })}`
}

const retry = () => reconnect(address)
const socket = new WebSocket(address)
socket.onopen = () => {
_reconnectFailures = 0
_socket = socket
handle(connectEvent)
_backoff = 1
Expand All @@ -954,11 +962,17 @@ export const
}
socket.onclose = () => {
const refreshRate = refreshRateB()
if (refreshRate === 0) return

// TODO handle refreshRate > 0 case
if (refreshRate === 0) return

_socket = null

// If on unstable network, retry immediately if we haven't failed before.
if (!_reconnectFailures) {
retry()
return
}

_page = null
_backoff *= 2
if (_backoff > 16) _backoff = 16
Expand Down Expand Up @@ -994,6 +1008,8 @@ export const
} else if (msg.m) {
const { u: username, e: editable } = msg.m
handle({ t: WaveEventType.Config, username, editable })
} else if (msg.i) {
_clientID = msg.i
}
} catch (error) {
console.error(error)
Expand All @@ -1003,10 +1019,15 @@ export const
}
socket.onerror = () => {
handle(dataEvent)
_reconnectFailures++
}
},
push = (data: any) => {
if (!_socket) return
push = (data: unknown) => {
if (!_socket) {
// Maybe currently reconnecting. Try again in 500ms.
if (!_reconnectFailures) setTimeout(() => push(data), 500)
return
}
_socket.send(`@ ${slug} ${JSON.stringify(data || {})}`)
},
fork = (): ChangeSet => {
Expand Down
3 changes: 2 additions & 1 deletion website/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Wave allows starting Wave server in 2 ways:
Wave can be configured via configuration (`.env`) file, environment variables or command line arguments with the following priority: `cmd arg > env var > config > default`.

<!-- CREDIT: https://www.tablesgenerator.com/markdown_tables. -->

<!-- https://github.com/h2oai/wave/issues/2256 -->
| ENV var or config (wave run or waved) | CLI args (waved) | Description |
|----------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| H2O_WAVE_ACCESS_KEY_ID | -access-key-id string | default API access key ID (default "access_key_id") |
Expand Down Expand Up @@ -58,6 +58,7 @@ Wave can be configured via configuration (`.env`) file, environment variables or
| H2O_WAVE_WEB_DIR | -web-dir string | directory to serve web assets from (default "./www") |
| H2O_WAVE_CONF | -conf string | path to a configuration file (default ".env") |
| H2O_WAVE_PING_INTERVAL | -ping-interval string | how often should ping messages be sent (e.g. 60s or 1m or 0.1h) to keep the websocket connection alive (default "50s") |
| H2O_WAVE_RECONNECT_TIMEOUT | -reconnect-timeout string | Time to wait for reconnect before dropping the client (default "2s") |
[^1]: `1`, `t`, `true` to enable; `0`, `f`, `false` to disable (case insensitive).
[^2]: Use OS-specific path list separator to specify multiple arguments - `:` for Linux/OSX and `;` for Windows. For example, `H2O_WAVE_PUBLIC_DIR=/images/@./files/images:/downloads/@./files/downloads`.

Expand Down

0 comments on commit 26f3c73

Please sign in to comment.