Skip to content

Commit

Permalink
(FFM-1857) Streaming reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
conormurray95 committed Dec 6, 2021
1 parent e06288b commit 510df99
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 44 deletions.
53 changes: 31 additions & 22 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {

go client.setAnalyticsServiceClient(ctx)

go client.retrieve(ctx)
go client.retrieveInitialData(ctx)

go client.streamConnect()

Expand All @@ -119,10 +119,8 @@ func (c *CfClient) IsInitialized() (bool, error) {
return false, fmt.Errorf("timeout waiting to initialize")
}

func (c *CfClient) retrieve(ctx context.Context) {
// check for first cycle of cron job
// for registering stream consumer
c.config.Logger.Info("Polling")
func (c *CfClient) retrieve(ctx context.Context) bool {
ok := true
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand All @@ -131,6 +129,7 @@ func (c *CfClient) retrieve(ctx context.Context) {
defer cancel()
err := c.retrieveFlags(rCtx)
if err != nil {
ok = false
c.config.Logger.Errorf("error while retrieving flags: %v", err.Error())
}
}()
Expand All @@ -141,12 +140,23 @@ func (c *CfClient) retrieve(ctx context.Context) {
defer cancel()
err := c.retrieveSegments(rCtx)
if err != nil {
c.config.Logger.Errorf("error while retrieving segments at startup: %v", err.Error())
ok = false
c.config.Logger.Errorf("error while retrieving segments: %v", err.Error())
}
}()
wg.Wait()
if ok {
c.config.Logger.Info("Data poll finished successfully")
} else {
c.config.Logger.Error("Data poll finished with errors")
}

return ok
}

func (c *CfClient) retrieveInitialData(ctx context.Context) {
c.retrieve(ctx)
c.initialized <- true
c.config.Logger.Info("Sync run finished")
}

func (c *CfClient) streamConnect() {
Expand All @@ -160,25 +170,22 @@ func (c *CfClient) streamConnect() {
defer c.mux.RUnlock()
c.config.Logger.Info("Registering SSE consumer")
sseClient := sse.NewClient(fmt.Sprintf("%s/stream?cluster=%s", c.config.url, c.clusterIdentifier))
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.config.Cache, c.api, c.config.Logger)
err := conn.Connect(c.environmentID)
if err != nil {
c.streamConnected = false
return
}

c.streamConnected = true
err = conn.OnDisconnect(func() error {
streamErr := func() {
c.config.Logger.Error("Stream disconnected. Swapping to polling mode")
// Wait one minute before moving to polling
time.Sleep(1 * time.Minute)
c.mux.RLock()
defer c.mux.RUnlock()
c.streamConnected = false
return nil
})
if err != nil {
c.config.Logger.Errorf("error disconnecting the stream, err: %v", err)
}
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.config.Cache, c.api, c.config.Logger, streamErr)

// Connect kicks off a goroutine that attempts to establish a stream connection
// while this is happening we set streamConnected to true - if any errors happen
// in this process streamConnected will be set back to false by the streamErr function
conn.Connect(c.environmentID)
c.streamConnected = true
}

func (c *CfClient) authenticate(ctx context.Context, target evaluation.Target) {
Expand Down Expand Up @@ -291,10 +298,12 @@ func (c *CfClient) pullCronJob(ctx context.Context) {
case <-pullingTicker.C:
c.mux.RLock()
if !c.streamConnected {
c.retrieve(ctx)
if c.config.enableStream {
ok := c.retrieve(ctx)
// we should only try and start the stream after the poll succeeded to make sure we get the latest changes
if ok && c.config.enableStream {
// here stream is enabled but not connected, so we attempt to reconnect
go c.streamConnect()
c.config.Logger.Info("Attempting to start stream")
c.streamConnect()
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ require (
github.com/stretchr/testify v1.5.1
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
)
45 changes: 23 additions & 22 deletions stream/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import (
"github.com/harness/ff-golang-server-sdk/dto"
"github.com/harness/ff-golang-server-sdk/logger"
"github.com/harness/ff-golang-server-sdk/rest"
backoff "gopkg.in/cenkalti/backoff.v1"

jsoniter "github.com/json-iterator/go"
"github.com/r3labs/sse"
)

// SSEClient is Server Send Event object
type SSEClient struct {
api rest.ClientWithResponsesInterface
client *sse.Client
cache cache.Cache
logger logger.Logger
api rest.ClientWithResponsesInterface
client *sse.Client
cache cache.Cache
logger logger.Logger
onStreamError func()
}

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand All @@ -32,20 +34,29 @@ func NewSSEClient(
cache cache.Cache,
api rest.ClientWithResponsesInterface,
logger logger.Logger,
onStreamError func(),
) *SSEClient {
client.Headers["Authorization"] = fmt.Sprintf("Bearer %s", token)
client.Headers["API-Key"] = apiKey
return &SSEClient{
client: client,
cache: cache,
api: api,
logger: logger,
client.OnDisconnect(func(client *sse.Client) {
onStreamError()
})
sseClient := &SSEClient{
client: client,
cache: cache,
api: api,
logger: logger,
onStreamError: onStreamError,
}
return sseClient
}

// Connect will subscribe to SSE stream
func (c *SSEClient) Connect(environment string) error {
func (c *SSEClient) Connect(environment string) {
c.logger.Infof("Start subscribing to Stream")
// don't use the default exponentialBackoff strategy - we have our own disconnect logic
// of polling the service then re-establishing a new stream once we can connect
c.client.ReconnectStrategy = &backoff.StopBackOff{}
// it is blocking operation, it needs to go in go routine
go func() {
err := c.client.Subscribe("*", func(msg *sse.Event) {
Expand Down Expand Up @@ -123,18 +134,8 @@ func (c *SSEClient) Connect(environment string) error {
}
})
if err != nil {
c.logger.Errorf("Error: %s", err.Error())
c.logger.Errorf("Error initializing stream: %s", err.Error())
c.onStreamError()
}
}()
return nil
}

// OnDisconnect will trigger func f when stream disconnects
func (c *SSEClient) OnDisconnect(f func() error) error {
c.client.OnDisconnect(func(client *sse.Client) {
if err := f(); err != nil {
c.logger.Errorf("error invoking func on stream disconnect, err: %s", err.Error())
}
})
return nil
}

0 comments on commit 510df99

Please sign in to comment.