Skip to content

Commit

Permalink
fix resub bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Hongssd committed Jan 24, 2024
1 parent 47a541b commit 7cd143c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
45 changes: 30 additions & 15 deletions common_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type WsStreamClient struct {
resultChan chan []byte
errChan chan error
isClose bool

reSubscribeMu *sync.Mutex
}

// 登陆请求相关
Expand Down Expand Up @@ -150,6 +152,10 @@ func (sub *Subscription[T]) CloseChan() chan struct{} {
return sub.closeChan
}

func (ws *WsStreamClient) GetConn() *websocket.Conn {
return ws.conn
}

func (ws *WsStreamClient) login(op string, arg WsLoginArg) (*Subscription[WsActionResult], error) {
if ws == nil || ws.conn == nil || ws.isClose {
return nil, fmt.Errorf("websocket is close")
Expand Down Expand Up @@ -330,14 +336,14 @@ func (ws *WsStreamClient) OpenConn() error {
ws.conn = conn
ws.isClose = false
ws.connId = ""
log.Debug("OpenConn success to ", apiUrl)
log.Info("OpenConn success to ", apiUrl)
ws.handleResult(ws.resultChan, ws.errChan)
return err
} else {
conn, err := wsStreamServe(apiUrl, ws.resultChan, ws.errChan)
ws.conn = conn
ws.connId = ""
log.Debug("Auto ReOpenConn success to ", apiUrl)
log.Info("Auto ReOpenConn success to ", apiUrl)
return err
}

Expand Down Expand Up @@ -366,6 +372,7 @@ func (*MyOkx) NewPublicWsStreamClient() *PublicWsStreamClient {
waitSubResultMu: &sync.Mutex{},
waitOrderResult: nil,
waitOrderResultMu: &sync.Mutex{},
reSubscribeMu: &sync.Mutex{},
},
}
}
Expand All @@ -382,6 +389,7 @@ func (*MyOkx) NewPrivateWsStreamClient() *PrivateWsStreamClient {
waitSubResultMu: &sync.Mutex{},
waitOrderResult: nil,
waitOrderResultMu: &sync.Mutex{},
reSubscribeMu: &sync.Mutex{},
},
}
}
Expand All @@ -398,6 +406,7 @@ func (*MyOkx) NewBusinessWsStreamClient() *BusinessWsStreamClient {
waitSubResultMu: &sync.Mutex{},
waitOrderResult: nil,
waitOrderResultMu: &sync.Mutex{},
reSubscribeMu: &sync.Mutex{},
},
}
}
Expand Down Expand Up @@ -468,6 +477,8 @@ func (ws *WsStreamClient) sendWsCloseToAllSub() {
}

func (ws *WsStreamClient) reSubscribeForReconnect() error {
ws.reSubscribeMu.Lock()
defer ws.reSubscribeMu.Unlock()
isDoReSubscribe := map[int64]bool{}
for _, sub := range ws.commonSubMap {
if _, ok := isDoReSubscribe[sub.SubId]; ok {
Expand All @@ -485,10 +496,12 @@ func (ws *WsStreamClient) reSubscribeForReconnect() error {
return err
}
log.Infof("reSubscribe Success: args:%v", reSub.Args)

sub.SubId = reSub.SubId
isDoReSubscribe[sub.SubId] = true
time.Sleep(500 * time.Millisecond)
}
log.Info(ws.commonSubMap)
return nil
}

Expand All @@ -513,20 +526,22 @@ func (ws *WsStreamClient) handleResult(resultChan chan []byte, errChan chan erro
time.Sleep(1500 * time.Millisecond)
err = ws.OpenConn()
}

//重新登陆
if ws.lastLogin != nil && ws.client != nil {
err = ws.Login(ws.client)
for err != nil {
time.Sleep(1500 * time.Millisecond)
go func() {
//重新登陆
if ws.lastLogin != nil && ws.client != nil {
err = ws.Login(ws.client)
for err != nil {
time.Sleep(1500 * time.Millisecond)
err = ws.Login(ws.client)
}
}
}
//重新订阅
err = ws.reSubscribeForReconnect()
if err != nil {
log.Error(err)
}

//重新订阅
err = ws.reSubscribeForReconnect()
if err != nil {
log.Error(err)
}
}()
} else {
continue
}
Expand Down Expand Up @@ -753,7 +768,7 @@ func (ws *WsStreamClient) Login(client *RestClient) error {
if err != nil {
return err
}
log.Debugf("Login Success args:%v ", ws.lastLogin.Args)
log.Infof("Login Success args:%v ", ws.lastLogin.Args)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion ws_business.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (ws *BusinessWsStreamClient) SubscribeCandleMultiple(instId []string, inter
if err != nil {
return nil, err
}
log.Debugf("SubscribeCandle Success: args:%v", doSub.Args)
log.Infof("SubscribeCandle Success: args:%v", doSub.Args)
sub := &Subscription[WsCandles]{
SubId: doSub.SubId,
Op: SUBSCRIBE,
Expand Down
2 changes: 1 addition & 1 deletion ws_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (ws *PrivateWsStreamClient) SubscribeOrders(instType, instFamily, instId st
if err != nil {
return nil, err
}
log.Debugf("SubscribeOrders Success: args:%v", doSub.Args)
log.Infof("SubscribeOrders Success: args:%v", doSub.Args)
sub := &Subscription[WsOrders]{
SubId: doSub.SubId,
Op: SUBSCRIBE,
Expand Down
4 changes: 2 additions & 2 deletions ws_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (ws *PublicWsStreamClient) SubscribeBooksMultiple(instIds []string, wsBooks
if err != nil {
return nil, err
}
log.Debugf("SubscribeBooks Success: args:%v", doSub.Args)
log.Infof("SubscribeBooks Success: args:%v", doSub.Args)
sub := &Subscription[WsBooks]{
SubId: doSub.SubId,
Op: SUBSCRIBE,
Expand Down Expand Up @@ -73,7 +73,7 @@ func (ws *PublicWsStreamClient) SubscribeTradesMultiple(instIds []string) (*Subs
if err != nil {
return nil, err
}
log.Debugf("SubscribeTrades Success: args:%v", doSub.Args)
log.Infof("SubscribeTrades Success: args:%v", doSub.Args)
sub := &Subscription[WsTrades]{
SubId: doSub.SubId,
Op: SUBSCRIBE,
Expand Down

0 comments on commit 7cd143c

Please sign in to comment.