Skip to content

Commit

Permalink
feat: add debug info to HA coordinator (#5883)
Browse files Browse the repository at this point in the history
  • Loading branch information
coadler committed Jan 26, 2023
1 parent 52ecd35 commit cc694a5
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 131 deletions.
74 changes: 60 additions & 14 deletions enterprise/tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand All @@ -24,15 +25,22 @@ import (
// that uses PostgreSQL pubsub to exchange handshakes.
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

nameCache, err := lru.New[uuid.UUID, string](512)
if err != nil {
panic("make lru cache: " + err.Error())
}

coord := &haCoordinator{
id: uuid.New(),
log: logger,
pubsub: pubsub,
closeFunc: cancelFunc,
close: make(chan struct{}),
nodes: map[uuid.UUID]*agpl.Node{},
agentSockets: map[uuid.UUID]net.Conn{},
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]net.Conn{},
agentSockets: map[uuid.UUID]*agpl.TrackedConn{},
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn{},
agentNameCache: nameCache,
}

if err := coord.runPubsub(ctx); err != nil {
Expand All @@ -53,10 +61,14 @@ type haCoordinator struct {
// nodes maps agent and connection IDs their respective node.
nodes map[uuid.UUID]*agpl.Node
// agentSockets maps agent IDs to their open websocket.
agentSockets map[uuid.UUID]net.Conn
agentSockets map[uuid.UUID]*agpl.TrackedConn
// agentToConnectionSockets maps agent IDs to connection IDs of conns that
// are subscribed to updates for that agent.
agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]net.Conn
agentToConnectionSockets map[uuid.UUID]map[uuid.UUID]*agpl.TrackedConn

// agentNameCache holds a cache of agent names. If one of them disappears,
// it's helpful to have a name cached for debugging.
agentNameCache *lru.Cache[uuid.UUID, string]
}

// Node returns an in-memory node by ID.
Expand Down Expand Up @@ -94,12 +106,18 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
c.mutex.Lock()
connectionSockets, ok := c.agentToConnectionSockets[agent]
if !ok {
connectionSockets = map[uuid.UUID]net.Conn{}
connectionSockets = map[uuid.UUID]*agpl.TrackedConn{}
c.agentToConnectionSockets[agent] = connectionSockets
}

// Insert this connection into a map so the agent can publish node updates.
connectionSockets[id] = conn
now := time.Now().Unix()
// Insert this connection into a map so the agent
// can publish node updates.
connectionSockets[id] = &agpl.TrackedConn{
Conn: conn,
Start: now,
LastWrite: now,
}
c.mutex.Unlock()

defer func() {
Expand Down Expand Up @@ -176,7 +194,9 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js

// ServeAgent accepts a WebSocket connection to an agent that listens to
// incoming connections and publishes node updates.
func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error {
func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
c.agentNameCache.Add(id, name)

// Tell clients on other instances to send a callmemaybe to us.
err := c.publishAgentHello(id)
if err != nil {
Expand All @@ -196,21 +216,41 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error
}
}

// This uniquely identifies a connection that belongs to this goroutine.
unique := uuid.New()
now := time.Now().Unix()
overwrites := int64(0)

// If an old agent socket is connected, we close it
// to avoid any leaks. This shouldn't ever occur because
// we expect one agent to be running.
c.mutex.Lock()
oldAgentSocket, ok := c.agentSockets[id]
if ok {
overwrites = oldAgentSocket.Overwrites + 1
_ = oldAgentSocket.Close()
}
c.agentSockets[id] = conn
c.agentSockets[id] = &agpl.TrackedConn{
ID: unique,
Conn: conn,

Name: name,
Start: now,
LastWrite: now,
Overwrites: overwrites,
}
c.mutex.Unlock()

defer func() {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.agentSockets, id)
delete(c.nodes, id)

// Only delete the connection if it's ours. It could have been
// overwritten.
if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique {
delete(c.agentSockets, id)
delete(c.nodes, id)
}
}()

decoder := json.NewDecoder(conn)
Expand Down Expand Up @@ -576,8 +616,14 @@ func (c *haCoordinator) formatAgentUpdate(id uuid.UUID, node *agpl.Node) ([]byte
return buf.Bytes(), nil
}

func (*haCoordinator) ServeHTTPDebug(w http.ResponseWriter, _ *http.Request) {
func (c *haCoordinator) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, "<h1>coordinator</h1>")
fmt.Fprintf(w, "<h2>ha debug coming soon</h2>")

c.mutex.RLock()
defer c.mutex.RUnlock()

fmt.Fprintln(w, "<h1>high-availability wireguard coordinator debug</h1>")
fmt.Fprintln(w, "<h4 style=\"margin-top:-25px\">warning: this only provides info from the node that served the request, if there are multiple replicas this data may be incomplete</h4>")

agpl.CoordinatorHTTPDebug(c.agentSockets, c.agentToConnectionSockets, c.agentNameCache)(w, r)
}

0 comments on commit cc694a5

Please sign in to comment.