From 7cd143cb078ea78245e1acf7ce9975e9d8655a23 Mon Sep 17 00:00:00 2001 From: Hongssd Date: Wed, 24 Jan 2024 10:36:42 +0800 Subject: [PATCH] fix resub bugs --- common_ws.go | 45 ++++++++++++++++++++++++++++++--------------- ws_business.go | 2 +- ws_private.go | 2 +- ws_public.go | 4 ++-- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/common_ws.go b/common_ws.go index 83f1959..4831335 100644 --- a/common_ws.go +++ b/common_ws.go @@ -59,6 +59,8 @@ type WsStreamClient struct { resultChan chan []byte errChan chan error isClose bool + + reSubscribeMu *sync.Mutex } // 登陆请求相关 @@ -150,6 +152,10 @@ func (sub *Subscription[T]) CloseChan() chan struct{} { return sub.closeChan } +func (ws *WsStreamClient) GetConn() *websocket.Conn { + return ws.conn +} + func (ws *WsStreamClient) login(op string, arg WsLoginArg) (*Subscription[WsActionResult], error) { if ws == nil || ws.conn == nil || ws.isClose { return nil, fmt.Errorf("websocket is close") @@ -330,14 +336,14 @@ func (ws *WsStreamClient) OpenConn() error { ws.conn = conn ws.isClose = false ws.connId = "" - log.Debug("OpenConn success to ", apiUrl) + log.Info("OpenConn success to ", apiUrl) ws.handleResult(ws.resultChan, ws.errChan) return err } else { conn, err := wsStreamServe(apiUrl, ws.resultChan, ws.errChan) ws.conn = conn ws.connId = "" - log.Debug("Auto ReOpenConn success to ", apiUrl) + log.Info("Auto ReOpenConn success to ", apiUrl) return err } @@ -366,6 +372,7 @@ func (*MyOkx) NewPublicWsStreamClient() *PublicWsStreamClient { waitSubResultMu: &sync.Mutex{}, waitOrderResult: nil, waitOrderResultMu: &sync.Mutex{}, + reSubscribeMu: &sync.Mutex{}, }, } } @@ -382,6 +389,7 @@ func (*MyOkx) NewPrivateWsStreamClient() *PrivateWsStreamClient { waitSubResultMu: &sync.Mutex{}, waitOrderResult: nil, waitOrderResultMu: &sync.Mutex{}, + reSubscribeMu: &sync.Mutex{}, }, } } @@ -398,6 +406,7 @@ func (*MyOkx) NewBusinessWsStreamClient() *BusinessWsStreamClient { waitSubResultMu: &sync.Mutex{}, waitOrderResult: nil, waitOrderResultMu: &sync.Mutex{}, + reSubscribeMu: &sync.Mutex{}, }, } } @@ -468,6 +477,8 @@ func (ws *WsStreamClient) sendWsCloseToAllSub() { } func (ws *WsStreamClient) reSubscribeForReconnect() error { + ws.reSubscribeMu.Lock() + defer ws.reSubscribeMu.Unlock() isDoReSubscribe := map[int64]bool{} for _, sub := range ws.commonSubMap { if _, ok := isDoReSubscribe[sub.SubId]; ok { @@ -485,10 +496,12 @@ func (ws *WsStreamClient) reSubscribeForReconnect() error { return err } log.Infof("reSubscribe Success: args:%v", reSub.Args) + sub.SubId = reSub.SubId isDoReSubscribe[sub.SubId] = true time.Sleep(500 * time.Millisecond) } + log.Info(ws.commonSubMap) return nil } @@ -513,20 +526,22 @@ func (ws *WsStreamClient) handleResult(resultChan chan []byte, errChan chan erro time.Sleep(1500 * time.Millisecond) err = ws.OpenConn() } - - //重新登陆 - if ws.lastLogin != nil && ws.client != nil { - err = ws.Login(ws.client) - for err != nil { - time.Sleep(1500 * time.Millisecond) + go func() { + //重新登陆 + if ws.lastLogin != nil && ws.client != nil { err = ws.Login(ws.client) + for err != nil { + time.Sleep(1500 * time.Millisecond) + err = ws.Login(ws.client) + } } - } - //重新订阅 - err = ws.reSubscribeForReconnect() - if err != nil { - log.Error(err) - } + + //重新订阅 + err = ws.reSubscribeForReconnect() + if err != nil { + log.Error(err) + } + }() } else { continue } @@ -753,7 +768,7 @@ func (ws *WsStreamClient) Login(client *RestClient) error { if err != nil { return err } - log.Debugf("Login Success args:%v ", ws.lastLogin.Args) + log.Infof("Login Success args:%v ", ws.lastLogin.Args) return nil } diff --git a/ws_business.go b/ws_business.go index 035367c..5e41974 100644 --- a/ws_business.go +++ b/ws_business.go @@ -29,7 +29,7 @@ func (ws *BusinessWsStreamClient) SubscribeCandleMultiple(instId []string, inter if err != nil { return nil, err } - log.Debugf("SubscribeCandle Success: args:%v", doSub.Args) + log.Infof("SubscribeCandle Success: args:%v", doSub.Args) sub := &Subscription[WsCandles]{ SubId: doSub.SubId, Op: SUBSCRIBE, diff --git a/ws_private.go b/ws_private.go index 6a4209c..c9e8486 100644 --- a/ws_private.go +++ b/ws_private.go @@ -26,7 +26,7 @@ func (ws *PrivateWsStreamClient) SubscribeOrders(instType, instFamily, instId st if err != nil { return nil, err } - log.Debugf("SubscribeOrders Success: args:%v", doSub.Args) + log.Infof("SubscribeOrders Success: args:%v", doSub.Args) sub := &Subscription[WsOrders]{ SubId: doSub.SubId, Op: SUBSCRIBE, diff --git a/ws_public.go b/ws_public.go index 1967569..51c122d 100644 --- a/ws_public.go +++ b/ws_public.go @@ -29,7 +29,7 @@ func (ws *PublicWsStreamClient) SubscribeBooksMultiple(instIds []string, wsBooks if err != nil { return nil, err } - log.Debugf("SubscribeBooks Success: args:%v", doSub.Args) + log.Infof("SubscribeBooks Success: args:%v", doSub.Args) sub := &Subscription[WsBooks]{ SubId: doSub.SubId, Op: SUBSCRIBE, @@ -73,7 +73,7 @@ func (ws *PublicWsStreamClient) SubscribeTradesMultiple(instIds []string) (*Subs if err != nil { return nil, err } - log.Debugf("SubscribeTrades Success: args:%v", doSub.Args) + log.Infof("SubscribeTrades Success: args:%v", doSub.Args) sub := &Subscription[WsTrades]{ SubId: doSub.SubId, Op: SUBSCRIBE,