Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(FFM-1857) Streaming reconnect #58

Merged
merged 2 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 51 additions & 38 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,22 @@ import (
// that any pending analytics events have been delivered.
//
type CfClient struct {
mux sync.RWMutex
api rest.ClientWithResponsesInterface
metricsapi metricsclient.ClientWithResponsesInterface
sdkKey string
auth rest.AuthenticationRequest
config *config
environmentID string
token string
persistence cache.Persistence
cancelFunc context.CancelFunc
streamConnected bool
authenticated chan struct{}
initialized chan bool
analyticsService *analyticsservice.AnalyticsService
clusterIdentifier string
mux sync.RWMutex
api rest.ClientWithResponsesInterface
metricsapi metricsclient.ClientWithResponsesInterface
sdkKey string
auth rest.AuthenticationRequest
config *config
environmentID string
token string
persistence cache.Persistence
cancelFunc context.CancelFunc
streamConnected bool
streamConnectedLock sync.RWMutex
authenticated chan struct{}
initialized chan bool
analyticsService *analyticsservice.AnalyticsService
clusterIdentifier string
}

// NewCfClient creates a new client instance that connects to CF with the default configuration.
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {

go client.setAnalyticsServiceClient(ctx)

go client.retrieve(ctx)
go client.retrieveInitialData(ctx)

go client.streamConnect()

Expand All @@ -119,10 +120,8 @@ func (c *CfClient) IsInitialized() (bool, error) {
return false, fmt.Errorf("timeout waiting to initialize")
}

func (c *CfClient) retrieve(ctx context.Context) {
// check for first cycle of cron job
// for registering stream consumer
c.config.Logger.Info("Polling")
func (c *CfClient) retrieve(ctx context.Context) bool {
ok := true
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand All @@ -131,6 +130,7 @@ func (c *CfClient) retrieve(ctx context.Context) {
defer cancel()
err := c.retrieveFlags(rCtx)
if err != nil {
ok = false
c.config.Logger.Errorf("error while retrieving flags: %v", err.Error())
}
}()
Expand All @@ -141,16 +141,30 @@ func (c *CfClient) retrieve(ctx context.Context) {
defer cancel()
err := c.retrieveSegments(rCtx)
if err != nil {
c.config.Logger.Errorf("error while retrieving segments at startup: %v", err.Error())
ok = false
c.config.Logger.Errorf("error while retrieving segments: %v", err.Error())
}
}()
wg.Wait()
if ok {
c.config.Logger.Info("Data poll finished successfully")
} else {
c.config.Logger.Error("Data poll finished with errors")
}

return ok
}

func (c *CfClient) retrieveInitialData(ctx context.Context) {
c.retrieve(ctx)
c.initialized <- true
c.config.Logger.Info("Sync run finished")
}

func (c *CfClient) streamConnect() {
if !c.config.enableStream {
// we only ever want one stream to be setup - other threads must wait before trying to establish a connection
c.streamConnectedLock.Lock()
defer c.streamConnectedLock.Unlock()
if !c.config.enableStream || c.streamConnected {
return
}

Expand All @@ -160,25 +174,22 @@ func (c *CfClient) streamConnect() {
defer c.mux.RUnlock()
c.config.Logger.Info("Registering SSE consumer")
sseClient := sse.NewClient(fmt.Sprintf("%s/stream?cluster=%s", c.config.url, c.clusterIdentifier))
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.config.Cache, c.api, c.config.Logger)
err := conn.Connect(c.environmentID)
if err != nil {
c.streamConnected = false
return
}

c.streamConnected = true
err = conn.OnDisconnect(func() error {
streamErr := func() {
c.config.Logger.Error("Stream disconnected. Swapping to polling mode")
// Wait one minute before moving to polling
time.Sleep(1 * time.Minute)
c.mux.RLock()
defer c.mux.RUnlock()
c.streamConnected = false
return nil
})
if err != nil {
c.config.Logger.Errorf("error disconnecting the stream, err: %v", err)
}
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.config.Cache, c.api, c.config.Logger, streamErr)

// Connect kicks off a goroutine that attempts to establish a stream connection
// while this is happening we set streamConnected to true - if any errors happen
// in this process streamConnected will be set back to false by the streamErr function
conn.Connect(c.environmentID)
c.streamConnected = true
davejohnston marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *CfClient) authenticate(ctx context.Context, target evaluation.Target) {
Expand Down Expand Up @@ -291,10 +302,12 @@ func (c *CfClient) pullCronJob(ctx context.Context) {
case <-pullingTicker.C:
c.mux.RLock()
if !c.streamConnected {
c.retrieve(ctx)
if c.config.enableStream {
ok := c.retrieve(ctx)
// we should only try and start the stream after the poll succeeded to make sure we get the latest changes
if ok && c.config.enableStream {
// here stream is enabled but not connected, so we attempt to reconnect
go c.streamConnect()
c.config.Logger.Info("Attempting to start stream")
c.streamConnect()
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ require (
github.com/stretchr/testify v1.5.1
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
)
45 changes: 23 additions & 22 deletions stream/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ import (
"github.com/harness/ff-golang-server-sdk/dto"
"github.com/harness/ff-golang-server-sdk/logger"
"github.com/harness/ff-golang-server-sdk/rest"
backoff "gopkg.in/cenkalti/backoff.v1"

jsoniter "github.com/json-iterator/go"
"github.com/r3labs/sse"
)

// SSEClient is Server Send Event object
type SSEClient struct {
api rest.ClientWithResponsesInterface
client *sse.Client
cache cache.Cache
logger logger.Logger
api rest.ClientWithResponsesInterface
client *sse.Client
cache cache.Cache
logger logger.Logger
onStreamError func()
}

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand All @@ -32,20 +34,29 @@ func NewSSEClient(
cache cache.Cache,
api rest.ClientWithResponsesInterface,
logger logger.Logger,
onStreamError func(),
) *SSEClient {
client.Headers["Authorization"] = fmt.Sprintf("Bearer %s", token)
client.Headers["API-Key"] = apiKey
return &SSEClient{
client: client,
cache: cache,
api: api,
logger: logger,
client.OnDisconnect(func(client *sse.Client) {
onStreamError()
})
sseClient := &SSEClient{
client: client,
cache: cache,
api: api,
logger: logger,
onStreamError: onStreamError,
}
return sseClient
}

// Connect will subscribe to SSE stream
func (c *SSEClient) Connect(environment string) error {
func (c *SSEClient) Connect(environment string) {
c.logger.Infof("Start subscribing to Stream")
// don't use the default exponentialBackoff strategy - we have our own disconnect logic
// of polling the service then re-establishing a new stream once we can connect
c.client.ReconnectStrategy = &backoff.StopBackOff{}
// it is blocking operation, it needs to go in go routine
go func() {
err := c.client.Subscribe("*", func(msg *sse.Event) {
Expand Down Expand Up @@ -123,18 +134,8 @@ func (c *SSEClient) Connect(environment string) error {
}
})
if err != nil {
c.logger.Errorf("Error: %s", err.Error())
c.logger.Errorf("Error initializing stream: %s", err.Error())
c.onStreamError()
}
}()
return nil
}

// OnDisconnect will trigger func f when stream disconnects
func (c *SSEClient) OnDisconnect(f func() error) error {
c.client.OnDisconnect(func(client *sse.Client) {
if err := f(); err != nil {
c.logger.Errorf("error invoking func on stream disconnect, err: %s", err.Error())
}
})
return nil
}