diff --git a/client/client.go b/client/client.go index d775cbff..984ad53b 100644 --- a/client/client.go +++ b/client/client.go @@ -212,6 +212,16 @@ func (c *CfClient) streamConnect(ctx context.Context) { c.mux.RLock() defer c.mux.RUnlock() c.streamConnected = false + + // If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected + // to let it know something is up with the stream it has been listening to + if c.config.eventStreamListener != nil { + c.config.eventStreamListener.Pub(context.Background(), stream.Event{ + APIKey: c.sdkKey, + Environment: c.environmentID, + Err: stream.ErrStreamDisconnect, + }) + } } conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger, streamErr, c.config.eventStreamListener) diff --git a/stream/sse.go b/stream/sse.go index 8a514466..e12041e2 100644 --- a/stream/sse.go +++ b/stream/sse.go @@ -98,8 +98,13 @@ func (c *SSEClient) subscribe(ctx context.Context, environment string, apiKey st }) if err != nil { c.logger.Errorf("Error initializing stream: %s", err.Error()) - c.onStreamError() } + + // The SSE library we use swallows the EOF error returned if a connection is closed by the server + // so we need to call onStreamError any time we've exited SubscribeWithContext. If we don't do + // this and the server closes the connection the Go SDK will still think it's connected to the stream + // even though it isn't + c.onStreamError() }() return out diff --git a/stream/stream.go b/stream/stream.go index ffc15e52..1d35e945 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -2,10 +2,14 @@ package stream import ( "context" + "errors" "github.com/r3labs/sse" ) +// ErrStreamDisconnect is a stream disconnect error +var ErrStreamDisconnect error = errors.New("stream disconnect") + // Connection is simple interface for streams type Connection interface { Connect(environment string) error @@ -28,4 +32,9 @@ type Event struct { Environment string // SSEEvent is the SSEEvent that was sent from the FeatureFlags server to the SDK SSEEvent *sse.Event + + // Err holds any errors encountered by the sdk while listening on the stream and this + // field should be used to pass those errors on to the EventStreamListener to let it + // know something has gone wrong on the stream it's listening on + Err error }