Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions cache/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding"
"fmt"

"github.com/harness/ff-proxy/stream"

"github.com/go-redis/redis/v8"
"github.com/harness/ff-proxy/domain"
)
Expand Down Expand Up @@ -117,10 +119,10 @@ func (r *RedisCache) HealthCheck(ctx context.Context) error {

// Pub publishes the passed values to a topic. If the topic doesn't exist Pub
// will create it as well as publishing the values to it.
func (r *RedisCache) Pub(ctx context.Context, topic string, event domain.StreamEvent) error {
func (r *RedisCache) Pub(ctx context.Context, topic string, event stream.StreamEvent) error {
values := map[string]interface{}{
domain.StreamEventValueAPIKey.String(): event.Values[domain.StreamEventValueAPIKey],
domain.StreamEventValueData.String(): event.Values[domain.StreamEventValueData],
stream.StreamEventValueAPIKey.String(): event.Values[stream.StreamEventValueAPIKey],
stream.StreamEventValueData.String(): event.Values[stream.StreamEventValueData],
}

err := r.client.XAdd(ctx, &redis.XAddArgs{
Expand All @@ -140,8 +142,8 @@ func (r *RedisCache) Pub(ctx context.Context, topic string, event domain.StreamE
// and will only exit if there is an error receiving on the redis stream or if
// the context is canceled. If the checkpoint is empty the default behaviour is to
// start listening for the next event on the stream.
func (r *RedisCache) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(event domain.StreamEvent)) error {
stream := fmt.Sprintf("stream-%s", topic)
func (r *RedisCache) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(event stream.StreamEvent)) error {
streamID := fmt.Sprintf("stream-%s", topic)

if checkpoint == "" {
checkpoint = "$"
Expand All @@ -153,23 +155,23 @@ func (r *RedisCache) Sub(ctx context.Context, topic string, checkpoint string, o
return ctx.Err()
default:
xstreams, err := r.client.XRead(ctx, &redis.XReadArgs{
Streams: []string{stream, checkpoint},
Streams: []string{streamID, checkpoint},
Block: 0,
}).Result()
if err != nil {
return err
}

for _, stream := range xstreams {
for _, msg := range stream.Messages {
for _, xstream := range xstreams {
for _, msg := range xstream.Messages {
checkpoint = msg.ID

event, err := domain.NewStreamEventFromMap(msg.Values)
event, err := stream.NewStreamEventFromMap(msg.Values)
if err != nil {
return err
}

event.Checkpoint, err = domain.NewCheckpoint(msg.ID)
event.Checkpoint, err = stream.NewCheckpoint(msg.ID)
if err != nil {
return err
}
Expand Down
35 changes: 18 additions & 17 deletions cache/redis_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"errors"
"testing"

"github.com/harness/ff-proxy/stream"

"github.com/go-redis/redis/v8"
"github.com/harness/ff-proxy/domain"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -56,9 +57,9 @@ func TestRedisCache_Pub(t *testing.T) {
t.Run(desc, func(t *testing.T) {

rc := RedisCache{client: tc.mockRedis}
event := domain.StreamEvent{
Values: map[domain.StreamEventValue]string{
domain.StreamEventValueData: "hello world",
event := stream.StreamEvent{
Values: map[stream.StreamEventValue]string{
stream.StreamEventValueData: "hello world",
},
}

Expand Down Expand Up @@ -103,33 +104,33 @@ func TestRedisCache_Sub(t *testing.T) {
testCases := map[string]struct {
mockRedis mockRedis
shouldErr bool
expected []domain.StreamEvent
expected []stream.StreamEvent
}{
"Given I have a redis client that errors reading from a stream": {
mockRedis: mockRedis{
xread: xreadError,
},
shouldErr: true,
expected: []domain.StreamEvent{},
expected: []stream.StreamEvent{},
},
"Given I have a redis client that reads from a stream successfully": {
mockRedis: mockRedis{
xread: xreadSuccess,
},
shouldErr: false,
expected: []domain.StreamEvent{
expected: []stream.StreamEvent{
{
Checkpoint: domain.Checkpoint("1642764292396-0"),
Values: map[domain.StreamEventValue]string{
domain.StreamEventValueAPIKey: "123",
domain.StreamEventValueData: "hello world",
Checkpoint: stream.Checkpoint("1642764292396-0"),
Values: map[stream.StreamEventValue]string{
stream.StreamEventValueAPIKey: "123",
stream.StreamEventValueData: "hello world",
},
},
{
Checkpoint: domain.Checkpoint("1642764292396-0"),
Values: map[domain.StreamEventValue]string{
domain.StreamEventValueAPIKey: "123",
domain.StreamEventValueData: "foo bar",
Checkpoint: stream.Checkpoint("1642764292396-0"),
Values: map[stream.StreamEventValue]string{
stream.StreamEventValueAPIKey: "123",
stream.StreamEventValueData: "foo bar",
},
},
},
Expand All @@ -142,11 +143,11 @@ func TestRedisCache_Sub(t *testing.T) {

rc := NewRedisCache(tc.mockRedis)

actual := []domain.StreamEvent{}
actual := []stream.StreamEvent{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := rc.Sub(ctx, "foo", "", func(event domain.StreamEvent) {
err := rc.Sub(ctx, "foo", "", func(event stream.StreamEvent) {
actual = append(actual, event)
if len(actual) == 2 {
cancel()
Expand Down
19 changes: 11 additions & 8 deletions cmd/ff-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"sync"
"time"

"github.com/harness/ff-proxy/token"

"github.com/harness/ff-proxy/stream"

"github.com/harness/ff-proxy/export"

"github.com/fanout/go-gripcontrol"
Expand All @@ -25,8 +29,7 @@ import (
gosdkCache "github.com/harness/ff-golang-server-sdk/cache"
harness "github.com/harness/ff-golang-server-sdk/client"
"github.com/harness/ff-golang-server-sdk/logger"
"github.com/harness/ff-golang-server-sdk/stream"
ffproxy "github.com/harness/ff-proxy"
sdkStream "github.com/harness/ff-golang-server-sdk/stream"
"github.com/harness/ff-proxy/cache"
"github.com/harness/ff-proxy/config"
"github.com/harness/ff-proxy/domain"
Expand Down Expand Up @@ -223,7 +226,7 @@ func init() {
flag.Parse()
}

func initFF(ctx context.Context, cache gosdkCache.Cache, baseURL, eventURL, envID, envIdent, projectIdent, sdkKey string, l log.Logger, eventListener stream.EventStreamListener) {
func initFF(ctx context.Context, cache gosdkCache.Cache, baseURL, eventURL, envID, envIdent, projectIdent, sdkKey string, l log.Logger, eventListener sdkStream.EventStreamListener) {

retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 5
Expand Down Expand Up @@ -395,9 +398,9 @@ func main() {
projEnvInfo := envIDToProjectEnvironmentInfo[authConfig[domain.AuthAPIKey(apiKeyHash)]]

// Start an event listener for each embedded SDK
var eventListener stream.EventStreamListener
var eventListener sdkStream.EventStreamListener
if rc, ok := sdkCache.(*cache.RedisCache); ok {
eventListener = ffproxy.NewEventListener(logger, rc, apiKeyHasher)
eventListener = stream.NewEventListener(logger, rc, apiKeyHasher)
} else {
logger.Info("proxy is not configured with a redis cache, therefore streaming will not be enabled")
}
Expand Down Expand Up @@ -462,15 +465,15 @@ func main() {

if rc, ok := sdkCache.(*cache.RedisCache); ok {
logger.Info("starting stream worker...")
sc := ffproxy.NewCheckpointingStream(ctx, rc, rc, logger)
streamWorker := ffproxy.NewStreamWorker(logger, gpc, sc, t...)
sc := stream.NewCheckpointingStream(ctx, rc, rc, logger)
streamWorker := stream.NewStreamWorker(logger, gpc, sc, t...)
streamWorker.Run(ctx)
streamingEnabled = true
} else {
logger.Info("the proxy isn't configured with redis so the streamworker will not be started ")
}

tokenSource := ffproxy.NewTokenSource(logger, authRepo, apiKeyHasher, []byte(authSecret))
tokenSource := token.NewTokenSource(logger, authRepo, apiKeyHasher, []byte(authSecret))

featureEvaluator := proxyservice.NewFeatureEvaluator()

Expand Down
19 changes: 9 additions & 10 deletions checkpointing_stream.go → stream/checkpointing_stream.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package ffproxy
package stream

import (
"context"
"fmt"

"github.com/harness/ff-proxy/domain"
"github.com/harness/ff-proxy/log"
)

Expand All @@ -16,10 +15,10 @@ type Checkpointer interface {

type checkpoint struct {
key string
value domain.Checkpoint
value Checkpoint
}

func newCheckpoint(key string, value domain.Checkpoint) checkpoint {
func newCheckpoint(key string, value Checkpoint) checkpoint {
return checkpoint{key: key, value: value}
}

Expand All @@ -28,15 +27,15 @@ type checkpoints chan checkpoint
// CheckpointingStream is a stream that stores checkpoints of the last event
// processed so it can resume from this point in the event of a failure.
type CheckpointingStream struct {
stream domain.Stream
stream Stream
checkpoint Checkpointer
log log.Logger
checkpoints checkpoints
}

// NewCheckpointingStream creates a CheckpointingStream and starts a process
// that listens for checkpoints
func NewCheckpointingStream(ctx context.Context, s domain.Stream, c Checkpointer, l log.Logger) CheckpointingStream {
func NewCheckpointingStream(ctx context.Context, s Stream, c Checkpointer, l log.Logger) CheckpointingStream {
l = l.With("component", "StreamWorker")
sc := CheckpointingStream{stream: s, checkpoint: c, log: l, checkpoints: make(checkpoints)}
sc.setCheckpoints(ctx)
Expand All @@ -60,7 +59,7 @@ func (s CheckpointingStream) setCheckpoints(ctx context.Context) {
}

oldCheckpoint := s.fetchCheckpoint(ctx, c.key)
if c.value.IsOlder(domain.Checkpoint(oldCheckpoint)) {
if c.value.IsOlder(Checkpoint(oldCheckpoint)) {
continue
}

Expand All @@ -75,7 +74,7 @@ func (s CheckpointingStream) setCheckpoints(ctx context.Context) {
}

// Pub makes CheckpointingStream implement the Stream interface.
func (s CheckpointingStream) Pub(ctx context.Context, topic string, values domain.StreamEvent) error {
func (s CheckpointingStream) Pub(ctx context.Context, topic string, values StreamEvent) error {
return s.stream.Pub(ctx, topic, values)
}

Expand All @@ -85,14 +84,14 @@ func (s CheckpointingStream) Pub(ctx context.Context, topic string, values domai
// checkpoint then the point at which it begins subscribing to the stream is determined
// by the implementation of the underlying Stream that the CheckpointingStream
// uses.
func (s CheckpointingStream) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(domain.StreamEvent)) error {
func (s CheckpointingStream) Sub(ctx context.Context, topic string, checkpoint string, onReceive func(StreamEvent)) error {
key := fmt.Sprintf("checkpoint-%s", topic)

if checkpoint == "" {
checkpoint = s.fetchCheckpoint(ctx, topic)
}

err := s.stream.Sub(ctx, topic, checkpoint, func(e domain.StreamEvent) {
err := s.stream.Sub(ctx, topic, checkpoint, func(e StreamEvent) {
onReceive(e)

select {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ffproxy
package stream

import (
"context"
Expand All @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/harness/ff-proxy/domain"
"github.com/harness/ff-proxy/log"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -56,11 +55,11 @@ func (m *mockCheckpointer) GetKV(ctx context.Context, key string) (string, error
return cp, nil
}

var testEvents = []domain.StreamEvent{
{Checkpoint: "1-0", Values: map[domain.StreamEventValue]string{}},
{Checkpoint: "2-0", Values: map[domain.StreamEventValue]string{}},
{Checkpoint: "3-1", Values: map[domain.StreamEventValue]string{}},
{Checkpoint: "3-2", Values: map[domain.StreamEventValue]string{}},
var testEvents = []StreamEvent{
{Checkpoint: "1-0", Values: map[StreamEventValue]string{}},
{Checkpoint: "2-0", Values: map[StreamEventValue]string{}},
{Checkpoint: "3-1", Values: map[StreamEventValue]string{}},
{Checkpoint: "3-2", Values: map[StreamEventValue]string{}},
}

func TestStreamCheckpointer_Sub(t *testing.T) {
Expand All @@ -71,7 +70,7 @@ func TestStreamCheckpointer_Sub(t *testing.T) {
checkpointer *mockCheckpointer
checkpoint string
shouldErr bool
expected []domain.StreamEvent
expected []StreamEvent
}{
"Given I call Sub with a stream that errors": {
stream: newMockStream(errors.New("error"), topic, testEvents...),
Expand Down Expand Up @@ -103,8 +102,8 @@ func TestStreamCheckpointer_Sub(t *testing.T) {

cs := NewCheckpointingStream(ctx, tc.stream, tc.checkpointer, log.NewNoOpLogger())

actual := []domain.StreamEvent{}
err := cs.Sub(ctx, topic, tc.checkpoint, func(event domain.StreamEvent) {
actual := []StreamEvent{}
err := cs.Sub(ctx, topic, tc.checkpoint, func(event StreamEvent) {
actual = append(actual, event)
})

Expand All @@ -118,19 +117,19 @@ func TestStreamCheckpointer_Sub(t *testing.T) {
}

func TestCheckpointStream_SetCheckpoint(t *testing.T) {
outOfOrderTestEvents := []domain.StreamEvent{
{Checkpoint: "2-0", Values: map[domain.StreamEventValue]string{}},
{Checkpoint: "3-2", Values: map[domain.StreamEventValue]string{}},
{Checkpoint: "3-1", Values: map[domain.StreamEventValue]string{}},
{Checkpoint: "1-0", Values: map[domain.StreamEventValue]string{}},
outOfOrderTestEvents := []StreamEvent{
{Checkpoint: "2-0", Values: map[StreamEventValue]string{}},
{Checkpoint: "3-2", Values: map[StreamEventValue]string{}},
{Checkpoint: "3-1", Values: map[StreamEventValue]string{}},
{Checkpoint: "1-0", Values: map[StreamEventValue]string{}},
}

topic := "test-topic"

testCases := map[string]struct {
expectedCheckpoint string
checkpointer *mockCheckpointer
events []domain.StreamEvent
events []StreamEvent
shouldErr bool
sets int
}{
Expand Down Expand Up @@ -160,7 +159,7 @@ func TestCheckpointStream_SetCheckpoint(t *testing.T) {

i := 0
newCtx, cancel := context.WithCancel(ctx)
err := cs.Sub(newCtx, topic, "", func(e domain.StreamEvent) {
err := cs.Sub(newCtx, topic, "", func(e StreamEvent) {
if i == len(tc.events) {
cancel()
}
Expand Down
Loading