Skip to content

Commit

Permalink
fix(wsconncache): only allow one peer per connection
Browse files Browse the repository at this point in the history
If an agent went away and reconnected, the wsconncache connection would
be polluted for about 10m because there would be two peers with the
same IP. The old peer always had priority, which caused the dashboard to
try and always dial the old peer until it was removed.

Fixes: #5292
  • Loading branch information
coadler committed Jan 26, 2023
1 parent b0a1615 commit 269bc1e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
28 changes: 22 additions & 6 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,21 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
_, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.

fmt.Println("getting conn from cache")
agentConn, release, err := api.workspaceAgentCache.Acquire(r, workspaceAgent.ID)
if err != nil {
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
return
}
defer release()
fmt.Println("got conn from cache")
ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command"))
if err != nil {
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err))
return
}
defer ptNetConn.Close()
fmt.Println("dialed reconnecting pty, doing bicopy")
agent.Bicopy(ctx, wsNetConn, ptNetConn)
}

Expand Down Expand Up @@ -402,11 +405,11 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req

func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*codersdk.AgentConn, error) {
clientConn, serverConn := net.Pipe()
go func() {
<-r.Context().Done()
_ = clientConn.Close()
_ = serverConn.Close()
}()
// go func() {
// <-r.Context().Done()
// _ = clientConn.Close()
// _ = serverConn.Close()
// }()

derpMap := api.DERPMap.Clone()
for _, region := range derpMap.Regions {
Expand Down Expand Up @@ -453,7 +456,16 @@ func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*
}

sendNodes, _ := tailnet.ServeCoordinator(clientConn, func(node []*tailnet.Node) error {
return conn.UpdateNodes(node)
err := conn.RemoveAllPeers()
if err != nil {
return xerrors.Errorf("remove all peers: %w", err)
}

err = conn.UpdateNodes(node)
if err != nil {
return xerrors.Errorf("update nodes: %w", err)
}
return nil
})
conn.SetNodeCallback(sendNodes)
go func() {
Expand All @@ -465,6 +477,10 @@ func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*
}()
return &codersdk.AgentConn{
Conn: conn,
CloseFunc: func() {
_ = clientConn.Close()
_ = serverConn.Close()
},
}, nil
}

Expand Down
25 changes: 25 additions & 0 deletions tailnet/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,31 @@ func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
c.wireguardEngine.SetDERPMap(derpMap)
}

func (c *Conn) RemoveAllPeers() error {
c.mutex.Lock()
defer c.mutex.Unlock()

c.netMap.Peers = []*tailcfg.Node{}
c.peerMap = map[tailcfg.NodeID]*tailcfg.Node{}
netMapCopy := *c.netMap
c.wireguardEngine.SetNetworkMap(&netMapCopy)
cfg, err := nmcfg.WGCfg(c.netMap, Logger(c.logger.Named("wgconfig")), netmap.AllowSingleHosts, "")
if err != nil {
return xerrors.Errorf("update wireguard config: %w", err)
}
err = c.wireguardEngine.Reconfig(cfg, c.wireguardRouter, &dns.Config{}, &tailcfg.Debug{})
if err != nil {
if c.isClosed() {
return nil
}
if errors.Is(err, wgengine.ErrNoChanges) {
return nil
}
return xerrors.Errorf("reconfig: %w", err)
}
return nil
}

// UpdateNodes connects with a set of peers. This can be constantly updated,
// and peers will continually be reconnected as necessary.
func (c *Conn) UpdateNodes(nodes []*Node) error {
Expand Down

0 comments on commit 269bc1e

Please sign in to comment.