Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming connections are now closed when you call Close on a client #79

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/harness/ff-golang-server-sdk/evaluation"
Expand Down Expand Up @@ -46,7 +48,6 @@ type CfClient struct {
config *config
environmentID string
token string
cancelFunc context.CancelFunc
streamConnected bool
streamConnectedLock sync.RWMutex
authenticated chan struct{}
Expand All @@ -55,17 +56,14 @@ type CfClient struct {
initializedLock sync.RWMutex
analyticsService *analyticsservice.AnalyticsService
clusterIdentifier string
stop chan struct{}
stopped *atomicBool
}

// NewCfClient creates a new client instance that connects to CF with the default configuration.
// For advanced configuration options use ConfigOptions functions
func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {

var (
ctx context.Context
err error
)

// functional options for config
config := newDefaultConfig()
for _, opt := range options {
Expand All @@ -81,8 +79,9 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
analyticsService: analyticsService,
clusterIdentifier: "1",
postEvalChan: make(chan evaluation.PostEvalData),
stop: make(chan struct{}),
stopped: newAtomicBool(false),
}
ctx, client.cancelFunc = context.WithCancel(context.Background())

if sdkKey == "" {
return client, types.ErrSdkCantBeEmpty
Expand All @@ -97,13 +96,20 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
return nil, err
}

go client.initAuthentication(ctx)

go client.setAnalyticsServiceClient(ctx)
client.start()
return client, nil
}

go client.pullCronJob(ctx)
func (c *CfClient) start() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-c.stop
cancel()
}()

return client, nil
go c.initAuthentication(ctx)
go c.setAnalyticsServiceClient(ctx)
go c.pullCronJob(ctx)
}

// PostEvaluateProcessor push the data to the analytics service
Expand Down Expand Up @@ -432,7 +438,12 @@ func (c *CfClient) JSONVariation(key string, target *evaluation.Target, defaultV
// Close shuts down the Feature Flag client. After calling this, the client
// should no longer be used
func (c *CfClient) Close() error {
c.cancelFunc()
if c.stopped.get() {
return errors.New("client already closed")
}
close(c.stop)

c.stopped.set(true)
return nil
}

Expand All @@ -448,3 +459,25 @@ func (c *CfClient) InterceptAddCluster(ctx context.Context, req *http.Request) e
req.URL.RawQuery = q.Encode()
return nil
}

type atomicBool struct {
flag int32
}

func newAtomicBool(value bool) *atomicBool {
b := new(atomicBool)
b.set(value)
return b
}

func (a *atomicBool) set(value bool) {
var i int32 = 0
if value {
i = 1
}
atomic.StoreInt32(&(a.flag), i)
}

func (a *atomicBool) get() bool {
return atomic.LoadInt32(&(a.flag)) != int32(0)
}
13 changes: 13 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,16 @@ var FeatureConfigsResponse = func(req *http.Request) (*http.Response, error) {

return httpmock.NewJsonResponse(200, FeatureConfigResponse)
}

func TestCfClient_Close(t *testing.T) {
client, err := newClient(&http.Client{})
if err != nil {
t.Error(err)
}

t.Log("When I close the client for the first time I should not get an error")
assert.Nil(t, client.Close())

t.Log("When I close the client for the second time I should an error")
assert.NotNil(t, client.Close())
}