Skip to content

axssbug/websocket

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

websocket

通用 WebSocket 服务端/客户端库,底层基于 coder/websocket,面向高并发推送场景设计。

设计目标

目标 说明
通用性 不绑定任何业务逻辑,通过 EventHandler 接口注入业务逻辑
高并发 每连接独立三级队列,外部回调非阻塞投递,per-connection 隔离处理
安全关闭 sync.Once 保证 Close 幂等,context 级联取消所有 goroutine
可观测 ConnectionCount()ErrWriteBufferFull、队列积压告警等可供上层监控
轻依赖 仅依赖 github.com/coder/websocketxsync,无框架绑定

为什么选 coder/websocket 而不是 gorilla/websocket

  • context 原生支持:所有读写操作接受 context.Context,超时/取消更自然
  • 内置 ping/pong:无需手动设置 SetPingHandler
  • 更小的 API 面Accept / Dial / Read / Write,不容易误用
  • gorilla 已停止维护(2023 年归档)

架构

HTTP 请求
  → Handler.ServeHTTP(由 h.Use(svc) 返回)
    → svc.OnConnect(r) → id     ← 业务方实现,认证逻辑自由(query/header/cookie)
    → websocket.Accept
    → newConnection
    → svc.OnConnected(conn)     ← 业务方实现,启动订阅等
    → conn.run()                ← 阻塞,启动三个 goroutine

每个 Connection 三级队列:

  readLoop goroutine            外部投递(NATS/EventBus callback)
  ┌──────────────────┐          conn.Enqueue("type", data) ← 非阻塞,立刻返回
  │ conn.Read(ctx)   │                   │
  │ ping → pong      │                   ▼
  │ → OnClientMessage│──────→  consumeCh (chan *QueueMessage, 2048)
  └──────────────────┘                   │
                                         ▼
                                consumeLoop goroutine
                                → svc.OnQueueMessage(conn, msg) ← per-connection 隔离
                                         │
                                         ▼
                                writeCh (chan any, 256)
                                         │
                                         ▼
                                writeLoop goroutine
                                → wsjson.Write()

不同连接的 OnQueueMessage 运行在各自独立的 goroutine 中,慢连接不影响其他连接的推送延迟。

核心类型

Handler

// NewHandler 创建连接池管理器,持有广播能力
func NewHandler(opts Options) *Handler

// Use 注册事件处理器,返回 http.Handler,可直接挂路由
func (h *Handler) Use(svc EventHandler) http.Handler

// 广播
func (h *Handler) BroadcastToClient(v any)                              // 直接写入所有连接,绕过消费队列
func (h *Handler) BroadcastToClientByID(id string, v any)               // 直接写入指定用户所有连接
func (h *Handler) BroadcastToQueue(msgType string, data any)            // 投入所有连接的消费队列,经 OnQueueMessage 过滤处理
func (h *Handler) BroadcastToQueueByID(id string, msgType string, data any)
func (h *Handler) ConnectionCount() int

EventHandler

// EventHandler 业务方实现此接口,注入到 Handler
type EventHandler interface {
    OnConnect(r *http.Request) (string, error)   // 认证,返回用户 ID
    OnConnected(conn *Connection)                // 连接建立后,启动订阅等
    OnClientMessage(conn *Connection, data []byte) // 客户端发来的消息,应快速处理
    OnQueueMessage(conn *Connection, msg *QueueMessage) // 队列消息,per-connection goroutine,可做重活
    OnClose(conn *Connection)                    // 连接关闭后,清理资源
}

QueueMessage

type QueueMessage struct {
    Type string    // 消息类型,如 "news.new"、"trade.update"
    Time time.Time // 投递时间,可用于延迟监控
    Data any       // 消息数据,struct 或 []byte 均可
}

Connection

Data any  // 业务方自由存放连接级别状态(NATS 订阅表、用户配置等)

func (c *Connection) ConnID() string                         // 连接唯一 ID(框架生成),同一用户多连接各不同
func (c *Connection) ID() string                             // 用户 ID(OnConnect 返回),同一用户多连接相同
func (c *Connection) WriteJSON(v any) error                  // 非阻塞写,队列满返回 ErrWriteBufferFull
func (c *Connection) Enqueue(msgType string, data any) error // 投入消费队列,非阻塞,队列满返回 ErrConsumeBufferFull
func (c *Connection) Close(reason string)

Client

func NewClient(url string, handler MessageHandler, opts ...ClientOptions) *Client
func (c *Client) Run()   // 阻塞,自动重连
func (c *Client) Stop()

快速上手

基础用法

// 1. 创建 Handler(连接池 + 广播)
h := ws.NewHandler(ws.Options{
    ConsumeBuffSize: 4096,
    Logger:          logrus.WithField("svc", "news"),
})

// 2. 业务 struct 实现 EventHandler 接口type NewsService struct {
    h       *ws.Handler // 持有 h 用于广播
    userSvc *UserService
}

func (s *NewsService) OnConnect(r *http.Request) (string, error) {
    return verifyJWT(r.Header.Get("Authorization"))
}

func (s *NewsService) OnConnected(conn *ws.Connection) {
    sub, _ := nats.Subscribe("news.new", func(msg *nats.Msg) {
        conn.Enqueue("news.new", msg.Data) // 非阻塞投递,不阻塞 NATS callback
    })
    conn.Data = sub
}

func (s *NewsService) OnClientMessage(conn *ws.Connection, data []byte) {
    // 快速处理客户端指令(subscribe/unsubscribe 等)
    var req Request
    json.Unmarshal(data, &req)
    // handle req...
}

func (s *NewsService) OnQueueMessage(conn *ws.Connection, msg *ws.QueueMessage) {
    // per-connection goroutine,可做重活,不影响其他连接
    switch msg.Type {
    case "news.new":
        event, _ := msg.Data.(*NewsEvent)
        if matchFilter(conn, event) {
            conn.WriteJSON(event)
        }
    }
}

func (s *NewsService) OnClose(conn *ws.Connection) {
    if sub, ok := conn.Data.(*nats.Subscription); ok {
        sub.Unsubscribe()
    }
}

// 3. 挂路由
svc := &NewsService{h: h, userSvc: NewUserService()}
http.Handle("/ws/news", h.Use(svc))
http.ListenAndServe(":8080", nil)

// 4. 广播(在任意地方调用)
h.BroadcastToQueue("news.new", &NewsEvent{...})

多路由,各自独立连接池

newsHandler    := ws.NewHandler(ws.Options{ConsumeBuffSize: 4096})
twitterHandler := ws.NewHandler(ws.Options{ConsumeBuffSize: 8192})
http.Handle("/ws/news",    newsHandler.Use(&NewsService{h: newsHandler}))
http.Handle("/ws/twitter", twitterHandler.Use(&TwitterService{h: twitterHandler}))

// 各自广播,互不影响
newsHandler.BroadcastToQueue("news.new", newsEvent)
twitterHandler.BroadcastToQueueByID(userID, "twitter.event", twitterEvent)

与 Echo 框架集成

e := echo.New()
e.GET("/ws/news",    echo.WrapHandler(newsHandler.Use(newsSvc)))
e.GET("/ws/twitter", echo.WrapHandler(twitterHandler.Use(twitterSvc)), middleware.Auth())

客户端

client := ws.NewClient(
    "ws://localhost:8080/ws/news",
    func(data []byte) { fmt.Println(string(data)) },
    ws.ClientOptions{ReconnectInterval: 3 * time.Second},
)
go client.Run()
client.Stop()

规划

v0.1(当前)

  • Handler:连接池、广播、三级队列内置
  • EventHandler:接口化设计,dispatch 天然正确
  • ConnectionEnqueue 外部投递、Data 业务状态、per-connection 隔离
  • QueueMessage:统一消息结构 {Type, Time, Data}
  • Client:自动重连
  • 多路由支持:每个 Handler 实例独立连接池

v0.2

  • 心跳检测:服务端主动 ping,超时自动断开
  • 连接限流:同一 ID 最大连接数限制
  • 指标暴露:Prometheus metrics(连接数、队列使用率、消息延迟)
  • 慢消息告警OnQueueMessage 超过阈值时触发回调

v0.3

  • 消息压缩AcceptOptions.CompressionMode 透传配置
  • TLS 客户端ClientOptions.TLSConfig
  • 分组广播BroadcastToGroup(group string, v any)

v1.0

  • 稳定 API:冻结公开接口,语义版本保证
  • 完整测试覆盖:单元测试 + 集成测试(httptest)
  • 基准测试:与 gorilla/websocket 对比吞吐量和延迟

依赖

用途
github.com/coder/websocket WebSocket 协议实现
github.com/puzpuzpuz/xsync/v4 无锁并发 Map,管理连接集合
github.com/sirupsen/logrus 日志
github.com/google/uuid 连接唯一 ID 生成

License

MIT

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages