Skip to content

Commit

Permalink
add sync mode for subscription events
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Nov 25, 2023
1 parent 43b90bc commit 1c18ddb
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ client.
WithExitWhenNoSubscription(false).
// WithRetryStatusCodes allow retry the subscription connection when receiving one of these codes
// the input parameter can be number string or range, e.g 4000-5000
WithRetryStatusCodes("4000", "4000-4050")
WithRetryStatusCodes("4000", "4000-4050").
// WithSyncMode subscription messages are executed in sequence (without goroutine)
WithSyncMode(true)
```
#### Subscription Protocols
Expand Down
18 changes: 16 additions & 2 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ type SubscriptionClient struct {
onError func(sc *SubscriptionClient, err error) error
errorChan chan error
exitWhenNoSubscription bool
syncMode bool
keepAliveInterval time.Duration
retryDelay time.Duration
mutex sync.Mutex
Expand Down Expand Up @@ -464,6 +465,12 @@ func (sc *SubscriptionClient) WithExitWhenNoSubscription(value bool) *Subscripti
return sc
}

// WithSyncMode subscription messages are executed in sequence (without goroutine)
func (sc *SubscriptionClient) WithSyncMode(value bool) *SubscriptionClient {
sc.syncMode = value
return sc
}

// Keep alive subroutine to send ping on specified interval
func startKeepAlive(ctx context.Context, c WebsocketConn, interval time.Duration) {
ticker := time.NewTicker(interval)
Expand Down Expand Up @@ -806,13 +813,20 @@ func (sc *SubscriptionClient) Run() error {
if sub == nil {
sub = &Subscription{}
}
go func() {

execMessage := func() {
if err := sc.protocol.OnMessage(subContext, *sub, message); err != nil {
sc.errorChan <- err
}

sc.checkSubscriptionStatuses(subContext)
}()
}

if sc.syncMode {
execMessage()
} else {
go execMessage()
}
}
}
}()
Expand Down
12 changes: 11 additions & 1 deletion subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"nhooyr.io/websocket"
)

func TestSubscription_LifeCycleEvents(t *testing.T) {
func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) {

server := subscription_setupServer(8082)
client, subscriptionClient := subscription_setupClients(8082)
msg := randomID()
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
subscriptionClient = subscriptionClient.
WithExitWhenNoSubscription(false).
WithTimeout(3 * time.Second).
WithSyncMode(syncMode).
OnConnected(func() {
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -200,6 +202,14 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
}
}

func TestSubscription_LifeCycleEvents(t *testing.T) {
testSubscription_LifeCycleEvents(t, false)
}

func TestSubscription_WithSyncMode(t *testing.T) {
testSubscription_LifeCycleEvents(t, true)
}

func TestSubscription_WithRetryStatusCodes(t *testing.T) {
stop := make(chan bool)
msg := randomID()
Expand Down

0 comments on commit 1c18ddb

Please sign in to comment.