diff --git a/cache/redis_client.go b/cache/redis_client.go index d1b8890f..f934661d 100644 --- a/cache/redis_client.go +++ b/cache/redis_client.go @@ -5,6 +5,8 @@ import ( "encoding" "fmt" + "github.com/harness/ff-proxy/stream" + "github.com/go-redis/redis/v8" "github.com/harness/ff-proxy/domain" ) @@ -117,10 +119,10 @@ func (r *RedisCache) HealthCheck(ctx context.Context) error { // Pub publishes the passed values to a topic. If the topic doesn't exist Pub // will create it as well as publishing the values to it. -func (r *RedisCache) Pub(ctx context.Context, topic string, event domain.StreamEvent) error { +func (r *RedisCache) Pub(ctx context.Context, topic string, event stream.StreamEvent) error { values := map[string]interface{}{ - domain.StreamEventValueAPIKey.String(): event.Values[domain.StreamEventValueAPIKey], - domain.StreamEventValueData.String(): event.Values[domain.StreamEventValueData], + stream.StreamEventValueAPIKey.String(): event.Values[stream.StreamEventValueAPIKey], + stream.StreamEventValueData.String(): event.Values[stream.StreamEventValueData], } err := r.client.XAdd(ctx, &redis.XAddArgs{ @@ -140,8 +142,8 @@ func (r *RedisCache) Pub(ctx context.Context, topic string, event domain.StreamE // and will only exit if there is an error receiving on the redis stream or if // the context is canceled. If the checkpoint is empty the default behaviour is to // start listening for the next event on the stream. -func (r *RedisCache) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(event domain.StreamEvent)) error { - stream := fmt.Sprintf("stream-%s", topic) +func (r *RedisCache) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(event stream.StreamEvent)) error { + streamID := fmt.Sprintf("stream-%s", topic) if checkpoint == "" { checkpoint = "$" @@ -153,23 +155,23 @@ func (r *RedisCache) Sub(ctx context.Context, topic string, checkpoint string, o return ctx.Err() default: xstreams, err := r.client.XRead(ctx, &redis.XReadArgs{ - Streams: []string{stream, checkpoint}, + Streams: []string{streamID, checkpoint}, Block: 0, }).Result() if err != nil { return err } - for _, stream := range xstreams { - for _, msg := range stream.Messages { + for _, xstream := range xstreams { + for _, msg := range xstream.Messages { checkpoint = msg.ID - event, err := domain.NewStreamEventFromMap(msg.Values) + event, err := stream.NewStreamEventFromMap(msg.Values) if err != nil { return err } - event.Checkpoint, err = domain.NewCheckpoint(msg.ID) + event.Checkpoint, err = stream.NewCheckpoint(msg.ID) if err != nil { return err } diff --git a/cache/redis_client_test.go b/cache/redis_client_test.go index 57f39b91..883b11da 100644 --- a/cache/redis_client_test.go +++ b/cache/redis_client_test.go @@ -5,8 +5,9 @@ import ( "errors" "testing" + "github.com/harness/ff-proxy/stream" + "github.com/go-redis/redis/v8" - "github.com/harness/ff-proxy/domain" "github.com/stretchr/testify/assert" ) @@ -56,9 +57,9 @@ func TestRedisCache_Pub(t *testing.T) { t.Run(desc, func(t *testing.T) { rc := RedisCache{client: tc.mockRedis} - event := domain.StreamEvent{ - Values: map[domain.StreamEventValue]string{ - domain.StreamEventValueData: "hello world", + event := stream.StreamEvent{ + Values: map[stream.StreamEventValue]string{ + stream.StreamEventValueData: "hello world", }, } @@ -103,33 +104,33 @@ func TestRedisCache_Sub(t *testing.T) { testCases := map[string]struct { mockRedis mockRedis shouldErr bool - expected []domain.StreamEvent + expected []stream.StreamEvent }{ "Given I have a redis client that errors reading from a stream": { mockRedis: mockRedis{ xread: xreadError, }, shouldErr: true, - expected: []domain.StreamEvent{}, + expected: []stream.StreamEvent{}, }, "Given I have a redis client that reads from a stream successfully": { mockRedis: mockRedis{ xread: xreadSuccess, }, shouldErr: false, - expected: []domain.StreamEvent{ + expected: []stream.StreamEvent{ { - Checkpoint: domain.Checkpoint("1642764292396-0"), - Values: map[domain.StreamEventValue]string{ - domain.StreamEventValueAPIKey: "123", - domain.StreamEventValueData: "hello world", + Checkpoint: stream.Checkpoint("1642764292396-0"), + Values: map[stream.StreamEventValue]string{ + stream.StreamEventValueAPIKey: "123", + stream.StreamEventValueData: "hello world", }, }, { - Checkpoint: domain.Checkpoint("1642764292396-0"), - Values: map[domain.StreamEventValue]string{ - domain.StreamEventValueAPIKey: "123", - domain.StreamEventValueData: "foo bar", + Checkpoint: stream.Checkpoint("1642764292396-0"), + Values: map[stream.StreamEventValue]string{ + stream.StreamEventValueAPIKey: "123", + stream.StreamEventValueData: "foo bar", }, }, }, @@ -142,11 +143,11 @@ func TestRedisCache_Sub(t *testing.T) { rc := NewRedisCache(tc.mockRedis) - actual := []domain.StreamEvent{} + actual := []stream.StreamEvent{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := rc.Sub(ctx, "foo", "", func(event domain.StreamEvent) { + err := rc.Sub(ctx, "foo", "", func(event stream.StreamEvent) { actual = append(actual, event) if len(actual) == 2 { cancel() diff --git a/cmd/ff-proxy/main.go b/cmd/ff-proxy/main.go index 3ae1a6f0..fa46a911 100644 --- a/cmd/ff-proxy/main.go +++ b/cmd/ff-proxy/main.go @@ -13,6 +13,10 @@ import ( "sync" "time" + "github.com/harness/ff-proxy/token" + + "github.com/harness/ff-proxy/stream" + "github.com/harness/ff-proxy/export" "github.com/fanout/go-gripcontrol" @@ -25,8 +29,7 @@ import ( gosdkCache "github.com/harness/ff-golang-server-sdk/cache" harness "github.com/harness/ff-golang-server-sdk/client" "github.com/harness/ff-golang-server-sdk/logger" - "github.com/harness/ff-golang-server-sdk/stream" - ffproxy "github.com/harness/ff-proxy" + sdkStream "github.com/harness/ff-golang-server-sdk/stream" "github.com/harness/ff-proxy/cache" "github.com/harness/ff-proxy/config" "github.com/harness/ff-proxy/domain" @@ -223,7 +226,7 @@ func init() { flag.Parse() } -func initFF(ctx context.Context, cache gosdkCache.Cache, baseURL, eventURL, envID, envIdent, projectIdent, sdkKey string, l log.Logger, eventListener stream.EventStreamListener) { +func initFF(ctx context.Context, cache gosdkCache.Cache, baseURL, eventURL, envID, envIdent, projectIdent, sdkKey string, l log.Logger, eventListener sdkStream.EventStreamListener) { retryClient := retryablehttp.NewClient() retryClient.RetryMax = 5 @@ -395,9 +398,9 @@ func main() { projEnvInfo := envIDToProjectEnvironmentInfo[authConfig[domain.AuthAPIKey(apiKeyHash)]] // Start an event listener for each embedded SDK - var eventListener stream.EventStreamListener + var eventListener sdkStream.EventStreamListener if rc, ok := sdkCache.(*cache.RedisCache); ok { - eventListener = ffproxy.NewEventListener(logger, rc, apiKeyHasher) + eventListener = stream.NewEventListener(logger, rc, apiKeyHasher) } else { logger.Info("proxy is not configured with a redis cache, therefore streaming will not be enabled") } @@ -462,15 +465,15 @@ func main() { if rc, ok := sdkCache.(*cache.RedisCache); ok { logger.Info("starting stream worker...") - sc := ffproxy.NewCheckpointingStream(ctx, rc, rc, logger) - streamWorker := ffproxy.NewStreamWorker(logger, gpc, sc, t...) + sc := stream.NewCheckpointingStream(ctx, rc, rc, logger) + streamWorker := stream.NewStreamWorker(logger, gpc, sc, t...) streamWorker.Run(ctx) streamingEnabled = true } else { logger.Info("the proxy isn't configured with redis so the streamworker will not be started ") } - tokenSource := ffproxy.NewTokenSource(logger, authRepo, apiKeyHasher, []byte(authSecret)) + tokenSource := token.NewTokenSource(logger, authRepo, apiKeyHasher, []byte(authSecret)) featureEvaluator := proxyservice.NewFeatureEvaluator() diff --git a/checkpointing_stream.go b/stream/checkpointing_stream.go similarity index 85% rename from checkpointing_stream.go rename to stream/checkpointing_stream.go index 124fc809..704030ef 100644 --- a/checkpointing_stream.go +++ b/stream/checkpointing_stream.go @@ -1,10 +1,9 @@ -package ffproxy +package stream import ( "context" "fmt" - "github.com/harness/ff-proxy/domain" "github.com/harness/ff-proxy/log" ) @@ -16,10 +15,10 @@ type Checkpointer interface { type checkpoint struct { key string - value domain.Checkpoint + value Checkpoint } -func newCheckpoint(key string, value domain.Checkpoint) checkpoint { +func newCheckpoint(key string, value Checkpoint) checkpoint { return checkpoint{key: key, value: value} } @@ -28,7 +27,7 @@ type checkpoints chan checkpoint // CheckpointingStream is a stream that stores checkpoints of the last event // processed so it can resume from this point in the event of a failure. type CheckpointingStream struct { - stream domain.Stream + stream Stream checkpoint Checkpointer log log.Logger checkpoints checkpoints @@ -36,7 +35,7 @@ type CheckpointingStream struct { // NewCheckpointingStream creates a CheckpointingStream and starts a process // that listens for checkpoints -func NewCheckpointingStream(ctx context.Context, s domain.Stream, c Checkpointer, l log.Logger) CheckpointingStream { +func NewCheckpointingStream(ctx context.Context, s Stream, c Checkpointer, l log.Logger) CheckpointingStream { l = l.With("component", "StreamWorker") sc := CheckpointingStream{stream: s, checkpoint: c, log: l, checkpoints: make(checkpoints)} sc.setCheckpoints(ctx) @@ -60,7 +59,7 @@ func (s CheckpointingStream) setCheckpoints(ctx context.Context) { } oldCheckpoint := s.fetchCheckpoint(ctx, c.key) - if c.value.IsOlder(domain.Checkpoint(oldCheckpoint)) { + if c.value.IsOlder(Checkpoint(oldCheckpoint)) { continue } @@ -75,7 +74,7 @@ func (s CheckpointingStream) setCheckpoints(ctx context.Context) { } // Pub makes CheckpointingStream implement the Stream interface. -func (s CheckpointingStream) Pub(ctx context.Context, topic string, values domain.StreamEvent) error { +func (s CheckpointingStream) Pub(ctx context.Context, topic string, values StreamEvent) error { return s.stream.Pub(ctx, topic, values) } @@ -85,14 +84,14 @@ func (s CheckpointingStream) Pub(ctx context.Context, topic string, values domai // checkpoint then the point at which it begins subscribing to the stream is determined // by the implementation of the underlying Stream that the CheckpointingStream // uses. -func (s CheckpointingStream) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(domain.StreamEvent)) error { +func (s CheckpointingStream) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(StreamEvent)) error { key := fmt.Sprintf("checkpoint-%s", topic) if checkpoint == "" { checkpoint = s.fetchCheckpoint(ctx, topic) } - err := s.stream.Sub(ctx, topic, checkpoint, func(e domain.StreamEvent) { + err := s.stream.Sub(ctx, topic, checkpoint, func(e StreamEvent) { onReceive(e) select { diff --git a/checkpointing_stream_test.go b/stream/checkpointing_stream_test.go similarity index 80% rename from checkpointing_stream_test.go rename to stream/checkpointing_stream_test.go index 9c070876..f66611f2 100644 --- a/checkpointing_stream_test.go +++ b/stream/checkpointing_stream_test.go @@ -1,4 +1,4 @@ -package ffproxy +package stream import ( "context" @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/harness/ff-proxy/domain" "github.com/harness/ff-proxy/log" "github.com/stretchr/testify/assert" ) @@ -56,11 +55,11 @@ func (m *mockCheckpointer) GetKV(ctx context.Context, key string) (string, error return cp, nil } -var testEvents = []domain.StreamEvent{ - {Checkpoint: "1-0", Values: map[domain.StreamEventValue]string{}}, - {Checkpoint: "2-0", Values: map[domain.StreamEventValue]string{}}, - {Checkpoint: "3-1", Values: map[domain.StreamEventValue]string{}}, - {Checkpoint: "3-2", Values: map[domain.StreamEventValue]string{}}, +var testEvents = []StreamEvent{ + {Checkpoint: "1-0", Values: map[StreamEventValue]string{}}, + {Checkpoint: "2-0", Values: map[StreamEventValue]string{}}, + {Checkpoint: "3-1", Values: map[StreamEventValue]string{}}, + {Checkpoint: "3-2", Values: map[StreamEventValue]string{}}, } func TestStreamCheckpointer_Sub(t *testing.T) { @@ -71,7 +70,7 @@ func TestStreamCheckpointer_Sub(t *testing.T) { checkpointer *mockCheckpointer checkpoint string shouldErr bool - expected []domain.StreamEvent + expected []StreamEvent }{ "Given I call Sub with a stream that errors": { stream: newMockStream(errors.New("error"), topic, testEvents...), @@ -103,8 +102,8 @@ func TestStreamCheckpointer_Sub(t *testing.T) { cs := NewCheckpointingStream(ctx, tc.stream, tc.checkpointer, log.NewNoOpLogger()) - actual := []domain.StreamEvent{} - err := cs.Sub(ctx, topic, tc.checkpoint, func(event domain.StreamEvent) { + actual := []StreamEvent{} + err := cs.Sub(ctx, topic, tc.checkpoint, func(event StreamEvent) { actual = append(actual, event) }) @@ -118,11 +117,11 @@ func TestStreamCheckpointer_Sub(t *testing.T) { } func TestCheckpointStream_SetCheckpoint(t *testing.T) { - outOfOrderTestEvents := []domain.StreamEvent{ - {Checkpoint: "2-0", Values: map[domain.StreamEventValue]string{}}, - {Checkpoint: "3-2", Values: map[domain.StreamEventValue]string{}}, - {Checkpoint: "3-1", Values: map[domain.StreamEventValue]string{}}, - {Checkpoint: "1-0", Values: map[domain.StreamEventValue]string{}}, + outOfOrderTestEvents := []StreamEvent{ + {Checkpoint: "2-0", Values: map[StreamEventValue]string{}}, + {Checkpoint: "3-2", Values: map[StreamEventValue]string{}}, + {Checkpoint: "3-1", Values: map[StreamEventValue]string{}}, + {Checkpoint: "1-0", Values: map[StreamEventValue]string{}}, } topic := "test-topic" @@ -130,7 +129,7 @@ func TestCheckpointStream_SetCheckpoint(t *testing.T) { testCases := map[string]struct { expectedCheckpoint string checkpointer *mockCheckpointer - events []domain.StreamEvent + events []StreamEvent shouldErr bool sets int }{ @@ -160,7 +159,7 @@ func TestCheckpointStream_SetCheckpoint(t *testing.T) { i := 0 newCtx, cancel := context.WithCancel(ctx) - err := cs.Sub(newCtx, topic, "", func(e domain.StreamEvent) { + err := cs.Sub(newCtx, topic, "", func(e StreamEvent) { if i == len(tc.events) { cancel() } diff --git a/event_listener.go b/stream/event_listener.go similarity index 75% rename from event_listener.go rename to stream/event_listener.go index 8d7d80f4..f7b4cea3 100644 --- a/event_listener.go +++ b/stream/event_listener.go @@ -1,4 +1,4 @@ -package ffproxy +package stream import ( "context" @@ -6,7 +6,6 @@ import ( "fmt" "github.com/harness/ff-golang-server-sdk/stream" - "github.com/harness/ff-proxy/domain" "github.com/harness/ff-proxy/hash" "github.com/harness/ff-proxy/log" ) @@ -16,12 +15,12 @@ import ( // it by the FeatureFlag server. type EventListener struct { log log.Logger - stream domain.Stream + stream Stream hasher hash.Hasher } // NewEventListener creates an EventListener -func NewEventListener(l log.Logger, s domain.Stream, h hash.Hasher) EventListener { +func NewEventListener(l log.Logger, s Stream, h hash.Hasher) EventListener { l = l.With("component", "EventListener") return EventListener{ log: l, @@ -40,12 +39,12 @@ func (e EventListener) Pub(ctx context.Context, event stream.Event) error { topic := event.Environment content := fmt.Sprintf("event: *\ndata: %s\n\n", event.SSEEvent.Data) - values := map[domain.StreamEventValue]string{ - domain.StreamEventValueAPIKey: topic, - domain.StreamEventValueData: content, + values := map[StreamEventValue]string{ + StreamEventValueAPIKey: topic, + StreamEventValueData: content, } - if err := e.stream.Pub(ctx, topic, domain.NewStreamEvent(values)); err != nil { + if err := e.stream.Pub(ctx, topic, NewStreamEvent(values)); err != nil { e.log.Error("failed to publish event to stream", "err", err) return err } diff --git a/event_listener_test.go b/stream/event_listener_test.go similarity index 71% rename from event_listener_test.go rename to stream/event_listener_test.go index 874b9654..b3dc2c81 100644 --- a/event_listener_test.go +++ b/stream/event_listener_test.go @@ -1,4 +1,4 @@ -package ffproxy +package stream import ( "context" @@ -7,7 +7,6 @@ import ( "testing" "github.com/harness/ff-golang-server-sdk/stream" - "github.com/harness/ff-proxy/domain" "github.com/harness/ff-proxy/hash" "github.com/harness/ff-proxy/log" "github.com/r3labs/sse" @@ -15,20 +14,20 @@ import ( ) type mockStream struct { - data map[string][]domain.StreamEvent + data map[string][]StreamEvent err error } -func newMockStream(err error, topic string, events ...domain.StreamEvent) mockStream { +func newMockStream(err error, topic string, events ...StreamEvent) mockStream { return mockStream{ - data: map[string][]domain.StreamEvent{ + data: map[string][]StreamEvent{ topic: events, }, err: err, } } -func (m mockStream) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(domain.StreamEvent)) error { +func (m mockStream) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(StreamEvent)) error { if m.err != nil { return m.err } @@ -39,7 +38,7 @@ func (m mockStream) Sub(ctx context.Context, topic string, checkpoint string, on } for _, event := range events { - if domain.Checkpoint(event.Checkpoint).IsOlder(domain.Checkpoint(checkpoint)) { + if Checkpoint(event.Checkpoint).IsOlder(Checkpoint(checkpoint)) { continue } @@ -48,7 +47,7 @@ func (m mockStream) Sub(ctx context.Context, topic string, checkpoint string, on return nil } -func (m mockStream) Pub(ctx context.Context, topic string, event domain.StreamEvent) error { +func (m mockStream) Pub(ctx context.Context, topic string, event StreamEvent) error { if m.err != nil { return m.err } @@ -65,11 +64,11 @@ func TestEventListener_Pub(t *testing.T) { stream mockStream event stream.Event shouldErr bool - expected map[string][]domain.StreamEvent + expected map[string][]StreamEvent }{ "Given I try to publish an event containing a nil SSEEvent": { stream: mockStream{ - data: make(map[string][]domain.StreamEvent), + data: make(map[string][]StreamEvent), err: nil, }, event: stream.Event{ @@ -78,11 +77,11 @@ func TestEventListener_Pub(t *testing.T) { SSEEvent: nil, }, shouldErr: true, - expected: map[string][]domain.StreamEvent{}, + expected: map[string][]StreamEvent{}, }, "Given I have a mockStream that errors when the EventListener tries to publish to it": { stream: mockStream{ - data: make(map[string][]domain.StreamEvent), + data: make(map[string][]StreamEvent), err: errors.New("pub err"), }, event: stream.Event{ @@ -96,11 +95,11 @@ func TestEventListener_Pub(t *testing.T) { }, }, shouldErr: true, - expected: map[string][]domain.StreamEvent{}, + expected: map[string][]StreamEvent{}, }, "Given I have a mockStream that doesn't error when the EventListener tries to publish to it": { stream: mockStream{ - data: make(map[string][]domain.StreamEvent), + data: make(map[string][]StreamEvent), err: nil, }, event: stream.Event{ @@ -114,13 +113,13 @@ func TestEventListener_Pub(t *testing.T) { }, }, shouldErr: false, - expected: map[string][]domain.StreamEvent{ - envID: []domain.StreamEvent{ + expected: map[string][]StreamEvent{ + envID: []StreamEvent{ { - Checkpoint: domain.Checkpoint(""), - Values: map[domain.StreamEventValue]string{ - domain.StreamEventValueAPIKey: envID, - domain.StreamEventValueData: fmt.Sprintf("event: *\ndata: foo\n\n"), + Checkpoint: Checkpoint(""), + Values: map[StreamEventValue]string{ + StreamEventValueAPIKey: envID, + StreamEventValueData: fmt.Sprintf("event: *\ndata: foo\n\n"), }, }, }, diff --git a/stream_worker.go b/stream/stream_worker.go similarity index 91% rename from stream_worker.go rename to stream/stream_worker.go index 3ed92e7c..fcab1e9e 100644 --- a/stream_worker.go +++ b/stream/stream_worker.go @@ -1,10 +1,9 @@ -package ffproxy +package stream import ( "context" "time" - "github.com/harness/ff-proxy/domain" "github.com/harness/ff-proxy/log" ) @@ -29,12 +28,12 @@ type streamEvent struct { type StreamWorker struct { log log.Logger gpc GripStream - stream domain.Stream + stream Stream topics []string } // NewStreamWorker creates a StreamWorker -func NewStreamWorker(l log.Logger, gpc GripStream, stream domain.Stream, topics ...string) StreamWorker { +func NewStreamWorker(l log.Logger, gpc GripStream, stream Stream, topics ...string) StreamWorker { l = l.With("component", "StreamWorker") return StreamWorker{ log: l, @@ -114,9 +113,9 @@ func (s StreamWorker) subscribe(ctx context.Context, topic string) <-chan stream close(out) }() - err := s.stream.Sub(ctx, topic, "", func(event domain.StreamEvent) { - apiKey := event.Values[domain.StreamEventValueAPIKey] - content := event.Values[domain.StreamEventValueData] + err := s.stream.Sub(ctx, topic, "", func(event StreamEvent) { + apiKey := event.Values[StreamEventValueAPIKey] + content := event.Values[StreamEventValueData] select { case <-ctx.Done(): diff --git a/domain/streaming.go b/stream/streaming.go similarity index 99% rename from domain/streaming.go rename to stream/streaming.go index d18bfc6f..c15d05ae 100644 --- a/domain/streaming.go +++ b/stream/streaming.go @@ -1,4 +1,4 @@ -package domain +package stream import ( "context" diff --git a/domain/streaming_test.go b/stream/streaming_test.go similarity index 99% rename from domain/streaming_test.go rename to stream/streaming_test.go index 3227cdda..b718532c 100644 --- a/domain/streaming_test.go +++ b/stream/streaming_test.go @@ -1,4 +1,4 @@ -package domain +package stream import ( "testing" diff --git a/token_source.go b/token/token_source.go similarity index 98% rename from token_source.go rename to token/token_source.go index 494b53d1..99a36a23 100644 --- a/token_source.go +++ b/token/token_source.go @@ -1,4 +1,4 @@ -package ffproxy +package token import ( "context" diff --git a/token_source_test.go b/token/token_source_test.go similarity index 99% rename from token_source_test.go rename to token/token_source_test.go index f23bb81a..af25ac8f 100644 --- a/token_source_test.go +++ b/token/token_source_test.go @@ -1,15 +1,16 @@ -package ffproxy +package token import ( "testing" "time" + "github.com/harness/ff-proxy/hash" + "github.com/harness/ff-proxy/cache" "github.com/harness/ff-proxy/log" "github.com/golang-jwt/jwt/v4" "github.com/harness/ff-proxy/domain" - "github.com/harness/ff-proxy/hash" "github.com/harness/ff-proxy/repository" "github.com/stretchr/testify/assert" ) diff --git a/transport/http_server_test.go b/transport/http_server_test.go index d018525f..d52b429d 100644 --- a/transport/http_server_test.go +++ b/transport/http_server_test.go @@ -14,10 +14,13 @@ import ( "reflect" "testing" + "github.com/harness/ff-proxy/token" + + "github.com/harness/ff-proxy/stream" + "github.com/fanout/go-gripcontrol" "github.com/go-redis/redis/v8" - "github.com/harness/ff-golang-server-sdk/stream" - ffproxy "github.com/harness/ff-proxy" + sdkstream "github.com/harness/ff-golang-server-sdk/stream" "github.com/harness/ff-proxy/cache" "github.com/harness/ff-proxy/config" "github.com/harness/ff-proxy/domain" @@ -84,8 +87,8 @@ type setupConfig struct { cacheHealthFn proxyservice.CacheHealthFn envHealthFn proxyservice.EnvHealthFn clientService *mockClientService - streamWorker ffproxy.StreamWorker - eventListener stream.EventStreamListener + streamWorker stream.StreamWorker + eventListener sdkstream.EventStreamListener cache cache.Cache metricService *mockMetricService } @@ -220,7 +223,7 @@ func setupHTTPServer(t *testing.T, bypassAuth bool, opts ...setupOpts) *HTTPServ logger := log.NoOpLogger{} - tokenSource := ffproxy.NewTokenSource(logger, setupConfig.authRepo, hash.NewSha256(), []byte(`secret`)) + tokenSource := token.NewTokenSource(logger, setupConfig.authRepo, hash.NewSha256(), []byte(`secret`)) var service proxyservice.ProxyService service = proxyservice.NewService(proxyservice.Config{ @@ -1311,7 +1314,7 @@ func TestHTTPServer_StreamIntegration(t *testing.T) { hasher := hash.NewSha256() logger := log.NewNoOpLogger() - sdkEvents := []stream.Event{ + sdkEvents := []sdkstream.Event{ { Environment: envID, SSEEvent: &sse.Event{ @@ -1363,7 +1366,7 @@ func TestHTTPServer_StreamIntegration(t *testing.T) { apiKeys []string topics []string numClients int - sdkEvents []stream.Event + sdkEvents []sdkstream.Event expectedEvents []*sse.Event expectedStatusCode int }{ @@ -1405,9 +1408,9 @@ func TestHTTPServer_StreamIntegration(t *testing.T) { }, }) - eventListener := ffproxy.NewEventListener(log.NewNoOpLogger(), cache, hasher) - checkpointingStream := ffproxy.NewCheckpointingStream(ctx, cache, cache, logger) - streamWorker := ffproxy.NewStreamWorker(logger, gpc, checkpointingStream, tc.topics...) + eventListener := stream.NewEventListener(log.NewNoOpLogger(), cache, hasher) + checkpointingStream := stream.NewCheckpointingStream(ctx, cache, cache, logger) + streamWorker := stream.NewStreamWorker(logger, gpc, checkpointingStream, tc.topics...) streamWorker.Run(ctx) requests := []*http.Request{}