Skip to content

Commit

Permalink
Merge pull request #115 from harness/FFM-8067
Browse files Browse the repository at this point in the history
FFM-8067 Call onStreamError anytime we exit Subscribe
  • Loading branch information
jcox250 committed Jun 2, 2023
2 parents 97d5472 + bc397ac commit fcab188
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ func (c *CfClient) streamConnect(ctx context.Context) {
c.mux.RLock()
defer c.mux.RUnlock()
c.streamConnected = false

// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
// to let it know something is up with the stream it has been listening to
if c.config.eventStreamListener != nil {
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
APIKey: c.sdkKey,
Environment: c.environmentID,
Err: stream.ErrStreamDisconnect,
})
}
}
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger, streamErr,
c.config.eventStreamListener)
Expand Down
7 changes: 6 additions & 1 deletion stream/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ func (c *SSEClient) subscribe(ctx context.Context, environment string, apiKey st
})
if err != nil {
c.logger.Errorf("Error initializing stream: %s", err.Error())
c.onStreamError()
}

// The SSE library we use swallows the EOF error returned if a connection is closed by the server
// so we need to call onStreamError any time we've exited SubscribeWithContext. If we don't do
// this and the server closes the connection the Go SDK will still think it's connected to the stream
// even though it isn't
c.onStreamError()
}()

return out
Expand Down
9 changes: 9 additions & 0 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package stream

import (
"context"
"errors"

"github.com/r3labs/sse"
)

// ErrStreamDisconnect is a stream disconnect error
var ErrStreamDisconnect error = errors.New("stream disconnect")

// Connection is simple interface for streams
type Connection interface {
Connect(environment string) error
Expand All @@ -28,4 +32,9 @@ type Event struct {
Environment string
// SSEEvent is the SSEEvent that was sent from the FeatureFlags server to the SDK
SSEEvent *sse.Event

// Err holds any errors encountered by the sdk while listening on the stream and this
// field should be used to pass those errors on to the EventStreamListener to let it
// know something has gone wrong on the stream it's listening on
Err error
}

0 comments on commit fcab188

Please sign in to comment.