diff --git a/broker_redis_test.go b/broker_redis_test.go index c88ab2d6..c1e89185 100644 --- a/broker_redis_test.go +++ b/broker_redis_test.go @@ -24,11 +24,11 @@ func getUniquePrefix() string { return "centrifuge-test-" + randString(3) + "-" + strconv.FormatInt(time.Now().UnixNano(), 10) } -func newTestRedisBroker(tb testing.TB, n *Node, useStreams bool, useCluster bool) *RedisBroker { +func newTestRedisBroker(tb testing.TB, n *Node, useStreams bool, useCluster bool, port int) *RedisBroker { if useCluster { return NewTestRedisBrokerCluster(tb, n, getUniquePrefix(), useStreams) } - return NewTestRedisBroker(tb, n, getUniquePrefix(), useStreams) + return NewTestRedisBroker(tb, n, getUniquePrefix(), useStreams, port) } func testNode(tb testing.TB) *Node { @@ -49,17 +49,20 @@ func benchNode(tb testing.TB) *Node { return node } -func testRedisConf() RedisShardConfig { +func testSingleRedisConf(port int) RedisShardConfig { + if port == 0 { + port = 6379 + } return RedisShardConfig{ - Address: "127.0.0.1:6379", + Address: "127.0.0.1:" + strconv.Itoa(port), IOTimeout: 10 * time.Second, ConnectTimeout: 10 * time.Second, } } -func NewTestRedisBroker(tb testing.TB, n *Node, prefix string, useStreams bool) *RedisBroker { +func NewTestRedisBroker(tb testing.TB, n *Node, prefix string, useStreams bool, port int) *RedisBroker { tb.Helper() - redisConf := testRedisConf() + redisConf := testSingleRedisConf(port) s, err := NewRedisShard(n, redisConf) require.NoError(tb, err) e, err := NewRedisBroker(n, RedisBrokerConfig{ @@ -167,50 +170,82 @@ func TestRedisBrokerSentinel(t *testing.T) { require.NoError(t, err) } -type redisTest struct { +type historyRedisTest struct { Name string UseStreams bool UseCluster bool + Port int } -var redisTests = []redisTest{ - {"list", false, false}, - {"strm", true, false}, - {"list_cluster", false, true}, - {"strm_cluster", true, true}, +var historyRedisTests = []historyRedisTest{ + {"rd_single_list", false, false, 6379}, + {"rd_single_strm", true, false, 6379}, + {"df_single_list", false, false, 7369}, + {"df_single_strm", true, false, 7369}, + {"rd_cluster_list", false, true, 0}, + {"rd_cluster_strm", true, true, 0}, } -var benchRedisTests = func() (tests []redisTest) { - for _, useCluster := range []bool{false, true} { - if os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS") == "" && useCluster { +type noHistoryRedisTest struct { + Name string + UseCluster bool + Port int +} + +var noHistoryRedisTests = []noHistoryRedisTest{ + {"rd_single", false, 6379}, + {"df_single", false, 7369}, + {"rd_cluster", false, 0}, +} + +var historyBenchRedisTests = func() (tests []historyRedisTest) { + for _, t := range historyRedisTests { + if os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS") == "" && t.UseCluster { continue } - for _, useStream := range []bool{false, true} { - var name string - if useStream { - name = "strm" - } else { - name = "list" - } - if useCluster { - name += "_cluster" - } - tests = append(tests, redisTest{ - Name: name, - UseCluster: useCluster, - UseStreams: useStream, - }) + tests = append(tests, t) + } + return +}() + +var noHistoryBenchRedisTests = func() (tests []noHistoryRedisTest) { + for _, t := range noHistoryRedisTests { + if os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS") == "" && t.UseCluster { + continue } + tests = append(tests, t) } return }() +//func excludeHistoryClusterTests(tests []historyRedisTest) []historyRedisTest { +// var res []historyRedisTest +// for _, t := range tests { +// if t.UseCluster { +// continue +// } +// res = append(res, t) +// } +// return res +//} + +func excludeNoHistoryClusterTests(tests []noHistoryRedisTest) []noHistoryRedisTest { + var res []noHistoryRedisTest + for _, t := range tests { + if t.UseCluster { + continue + } + res = append(res, t) + } + return res +} + func TestRedisBroker(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(b) @@ -291,11 +326,11 @@ func TestRedisBroker(t *testing.T) { } func TestRedisBrokerPublishIdempotent(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(b) @@ -347,10 +382,10 @@ func TestRedisBrokerPublishIdempotent(t *testing.T) { } func TestRedisCurrentPosition(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(b) @@ -380,10 +415,10 @@ func TestRedisCurrentPosition(t *testing.T) { } func TestRedisBrokerRecover(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(b) @@ -457,166 +492,170 @@ func pubSubChannels(t *testing.T, e *RedisBroker) ([]string, error) { } func TestRedisBrokerSubscribeUnsubscribe(t *testing.T) { - // Custom prefix to not collide with other tests. - node := testNode(t) - b := NewTestRedisBroker(t, node, getUniquePrefix(), false) - defer func() { _ = node.Shutdown(context.Background()) }() - defer stopRedisBroker(b) + for _, tt := range noHistoryRedisTests { + t.Run(tt.Name, func(t *testing.T) { + // Custom prefix to not collide with other tests. + node := testNode(t) + b := NewTestRedisBroker(t, node, getUniquePrefix(), false, 0) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisBroker(b) - if b.shards[0].shard.useCluster { - t.Skip("Channels command is not supported when Redis Cluster is used") - } + if b.shards[0].shard.useCluster { + t.Skip("Channels command is not supported when Redis Cluster is used") + } - require.NoError(t, b.Subscribe("1-test")) - require.NoError(t, b.Subscribe("1-test")) - channels, err := pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 1 { - // Redis PUBSUB CHANNELS command looks like eventual consistent, so sometimes - // it returns wrong results, sleeping for a while helps in such situations. - // See https://gist.github.com/FZambia/80a5241e06b4662f7fe89cfaf24072c3 - time.Sleep(2000 * time.Millisecond) - channels, err := pubSubChannels(t, b) - require.NoError(t, err) - require.Equal(t, 1, len(channels), fmt.Sprintf("%#v", channels)) - } + require.NoError(t, b.Subscribe("1-test")) + require.NoError(t, b.Subscribe("1-test")) + channels, err := pubSubChannels(t, b) + require.NoError(t, err) + if len(channels) != 1 { + // Redis PUBSUB CHANNELS command looks like eventual consistent, so sometimes + // it returns wrong results, sleeping for a while helps in such situations. + // See https://gist.github.com/FZambia/80a5241e06b4662f7fe89cfaf24072c3 + time.Sleep(2000 * time.Millisecond) + channels, err := pubSubChannels(t, b) + require.NoError(t, err) + require.Equal(t, 1, len(channels), fmt.Sprintf("%#v", channels)) + } - require.NoError(t, b.Unsubscribe("1-test")) - channels, err = pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, _ := pubSubChannels(t, b) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) - } + require.NoError(t, b.Unsubscribe("1-test")) + channels, err = pubSubChannels(t, b) + require.NoError(t, err) + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, _ := pubSubChannels(t, b) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } - var wg sync.WaitGroup + var wg sync.WaitGroup - // The same channel in parallel. - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(t, b.Subscribe("2-test")) - require.NoError(t, b.Unsubscribe("2-test")) - }() - } - wg.Wait() - channels, err = pubSubChannels(t, b) - require.NoError(t, err) + // The same channel in parallel. + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, b.Subscribe("2-test")) + require.NoError(t, b.Unsubscribe("2-test")) + }() + } + wg.Wait() + channels, err = pubSubChannels(t, b) + require.NoError(t, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, _ := pubSubChannels(t, b) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) - } - - // Different channels in parallel. - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - require.NoError(t, b.Subscribe("3-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("3-test-"+strconv.Itoa(i))) - }(i) - } - wg.Wait() - channels, err = pubSubChannels(t, b) - require.Equal(t, nil, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, err := pubSubChannels(t, b) - require.NoError(t, err) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) - } + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, _ := pubSubChannels(t, b) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } - // The same channel sequential. - for i := 0; i < 1000; i++ { - require.NoError(t, b.Subscribe("4-test")) - require.NoError(t, b.Unsubscribe("4-test")) - } - channels, err = pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, _ := pubSubChannels(t, b) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) - } + // Different channels in parallel. + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + require.NoError(t, b.Subscribe("3-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("3-test-"+strconv.Itoa(i))) + }(i) + } + wg.Wait() + channels, err = pubSubChannels(t, b) + require.Equal(t, nil, err) + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, err := pubSubChannels(t, b) + require.NoError(t, err) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } - // Different channels sequential. - for j := 0; j < 10; j++ { - for i := 0; i < 100; i++ { - require.NoError(t, b.Subscribe("5-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("5-test-"+strconv.Itoa(i))) - } - channels, err = pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, err := pubSubChannels(t, b) + // The same channel sequential. + for i := 0; i < 1000; i++ { + require.NoError(t, b.Subscribe("4-test")) + require.NoError(t, b.Unsubscribe("4-test")) + } + channels, err = pubSubChannels(t, b) require.NoError(t, err) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) - } - } + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, _ := pubSubChannels(t, b) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } - // Different channels subscribe only in parallel. - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - require.NoError(t, b.Subscribe("6-test-"+strconv.Itoa(i))) - }(i) - } - wg.Wait() - channels, err = pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 100 { - time.Sleep(2000 * time.Millisecond) - channels, err := pubSubChannels(t, b) - require.NoError(t, err) - require.Equal(t, 100, len(channels), fmt.Sprintf("%#v", channels)) - } + // Different channels sequential. + for j := 0; j < 10; j++ { + for i := 0; i < 100; i++ { + require.NoError(t, b.Subscribe("5-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("5-test-"+strconv.Itoa(i))) + } + channels, err = pubSubChannels(t, b) + require.NoError(t, err) + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, err := pubSubChannels(t, b) + require.NoError(t, err) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } + } - // Different channels unsubscribe only in parallel. - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - require.NoError(t, b.Unsubscribe("6-test-"+strconv.Itoa(i))) - }(i) - } - wg.Wait() - channels, err = pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, _ := pubSubChannels(t, b) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) - } - - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - require.NoError(t, b.Unsubscribe("7-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("8-test-"+strconv.Itoa(i))) - require.NoError(t, b.Subscribe("8-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("9-test-"+strconv.Itoa(i))) - require.NoError(t, b.Subscribe("7-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("8-test-"+strconv.Itoa(i))) - require.NoError(t, b.Subscribe("9-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("9-test-"+strconv.Itoa(i))) - require.NoError(t, b.Unsubscribe("7-test-"+strconv.Itoa(i))) - }(i) - } - wg.Wait() - channels, err = pubSubChannels(t, b) - require.NoError(t, err) - if len(channels) != 0 { - time.Sleep(2000 * time.Millisecond) - channels, err := pubSubChannels(t, b) - require.NoError(t, err) - require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + // Different channels subscribe only in parallel. + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + require.NoError(t, b.Subscribe("6-test-"+strconv.Itoa(i))) + }(i) + } + wg.Wait() + channels, err = pubSubChannels(t, b) + require.NoError(t, err) + if len(channels) != 100 { + time.Sleep(2000 * time.Millisecond) + channels, err := pubSubChannels(t, b) + require.NoError(t, err) + require.Equal(t, 100, len(channels), fmt.Sprintf("%#v", channels)) + } + + // Different channels unsubscribe only in parallel. + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + require.NoError(t, b.Unsubscribe("6-test-"+strconv.Itoa(i))) + }(i) + } + wg.Wait() + channels, err = pubSubChannels(t, b) + require.NoError(t, err) + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, _ := pubSubChannels(t, b) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } + + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + require.NoError(t, b.Unsubscribe("7-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("8-test-"+strconv.Itoa(i))) + require.NoError(t, b.Subscribe("8-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("9-test-"+strconv.Itoa(i))) + require.NoError(t, b.Subscribe("7-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("8-test-"+strconv.Itoa(i))) + require.NoError(t, b.Subscribe("9-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("9-test-"+strconv.Itoa(i))) + require.NoError(t, b.Unsubscribe("7-test-"+strconv.Itoa(i))) + }(i) + } + wg.Wait() + channels, err = pubSubChannels(t, b) + require.NoError(t, err) + if len(channels) != 0 { + time.Sleep(2000 * time.Millisecond) + channels, err := pubSubChannels(t, b) + require.NoError(t, err) + require.Equal(t, 0, len(channels), fmt.Sprintf("%#v", channels)) + } + }) } } @@ -681,63 +720,67 @@ func TestRedisConsistentIndex(t *testing.T) { } func TestRedisBrokerHandlePubSubMessage(t *testing.T) { - node := testNode(t) - b := NewTestRedisBroker(t, node, getUniquePrefix(), false) - defer func() { _ = node.Shutdown(context.Background()) }() - defer stopRedisBroker(b) - err := b.handleRedisClientMessage(&testBrokerEventHandler{HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { - require.Equal(t, "test", ch) - require.Equal(t, uint64(16901), sp.Offset) - require.Equal(t, "xyz", sp.Epoch) - return nil - }}, b.messageChannelID(b.shards[0].shard, "test"), []byte("__p1:16901:xyz__dsdsd")) - require.Error(t, err) - - err = b.handleRedisClientMessage(&testBrokerEventHandler{HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { - return nil - }}, b.messageChannelID(b.shards[0].shard, "test"), []byte("__p1:16901")) - require.Error(t, err) - - pub := &protocol.Publication{ - Data: []byte("{}"), - } - data, err := pub.MarshalVT() - require.NoError(t, err) - var publicationHandlerCalled bool - err = b.handleRedisClientMessage(&testBrokerEventHandler{HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { - publicationHandlerCalled = true - require.Equal(t, "test", ch) - require.Equal(t, uint64(16901), sp.Offset) - require.Equal(t, "xyz", sp.Epoch) - return nil - }}, b.messageChannelID(b.shards[0].shard, "test"), []byte("__p1:16901:xyz__"+string(data))) - require.NoError(t, err) - require.True(t, publicationHandlerCalled) + for _, tt := range excludeNoHistoryClusterTests(noHistoryRedisTests) { + t.Run(tt.Name, func(t *testing.T) { + node := testNode(t) + b := NewTestRedisBroker(t, node, getUniquePrefix(), tt.UseCluster, tt.Port) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisBroker(b) + err := b.handleRedisClientMessage(&testBrokerEventHandler{HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { + require.Equal(t, "test", ch) + require.Equal(t, uint64(16901), sp.Offset) + require.Equal(t, "xyz", sp.Epoch) + return nil + }}, b.messageChannelID(b.shards[0].shard, "test"), []byte("__p1:16901:xyz__dsdsd")) + require.Error(t, err) + + err = b.handleRedisClientMessage(&testBrokerEventHandler{HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { + return nil + }}, b.messageChannelID(b.shards[0].shard, "test"), []byte("__p1:16901")) + require.Error(t, err) + + pub := &protocol.Publication{ + Data: []byte("{}"), + } + data, err := pub.MarshalVT() + require.NoError(t, err) + var publicationHandlerCalled bool + err = b.handleRedisClientMessage(&testBrokerEventHandler{HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { + publicationHandlerCalled = true + require.Equal(t, "test", ch) + require.Equal(t, uint64(16901), sp.Offset) + require.Equal(t, "xyz", sp.Epoch) + return nil + }}, b.messageChannelID(b.shards[0].shard, "test"), []byte("__p1:16901:xyz__"+string(data))) + require.NoError(t, err) + require.True(t, publicationHandlerCalled) - info := &protocol.ClientInfo{ - User: "12", + info := &protocol.ClientInfo{ + User: "12", + } + data, err = info.MarshalVT() + require.NoError(t, err) + var joinHandlerCalled bool + err = b.handleRedisClientMessage(&testBrokerEventHandler{HandleJoinFunc: func(ch string, info *ClientInfo) error { + joinHandlerCalled = true + require.Equal(t, "test", ch) + require.Equal(t, "12", info.UserID) + return nil + }}, b.messageChannelID(b.shards[0].shard, "test"), append(joinTypePrefix, data...)) + require.NoError(t, err) + require.True(t, joinHandlerCalled) + + var leaveHandlerCalled bool + err = b.handleRedisClientMessage(&testBrokerEventHandler{HandleLeaveFunc: func(ch string, info *ClientInfo) error { + leaveHandlerCalled = true + require.Equal(t, "test", ch) + require.Equal(t, "12", info.UserID) + return nil + }}, b.messageChannelID(b.shards[0].shard, "test"), append(leaveTypePrefix, data...)) + require.NoError(t, err) + require.True(t, leaveHandlerCalled) + }) } - data, err = info.MarshalVT() - require.NoError(t, err) - var joinHandlerCalled bool - err = b.handleRedisClientMessage(&testBrokerEventHandler{HandleJoinFunc: func(ch string, info *ClientInfo) error { - joinHandlerCalled = true - require.Equal(t, "test", ch) - require.Equal(t, "12", info.UserID) - return nil - }}, b.messageChannelID(b.shards[0].shard, "test"), append(joinTypePrefix, data...)) - require.NoError(t, err) - require.True(t, joinHandlerCalled) - - var leaveHandlerCalled bool - err = b.handleRedisClientMessage(&testBrokerEventHandler{HandleLeaveFunc: func(ch string, info *ClientInfo) error { - leaveHandlerCalled = true - require.Equal(t, "test", ch) - require.Equal(t, "12", info.UserID) - return nil - }}, b.messageChannelID(b.shards[0].shard, "test"), append(leaveTypePrefix, data...)) - require.NoError(t, err) - require.True(t, leaveHandlerCalled) } func BenchmarkRedisExtractPushData(b *testing.B) { @@ -798,231 +841,243 @@ func TestRedisExtractPushData(t *testing.T) { } func TestNode_OnSurvey_TwoNodes(t *testing.T) { - redisConf := testRedisConf() + for _, tt := range excludeNoHistoryClusterTests(noHistoryRedisTests) { + t.Run(tt.Name, func(t *testing.T) { + redisConf := testSingleRedisConf(tt.Port) - node1, _ := New(Config{}) + node1, _ := New(Config{}) - s, err := NewRedisShard(node1, redisConf) - require.NoError(t, err) + s, err := NewRedisShard(node1, redisConf) + require.NoError(t, err) - prefix := getUniquePrefix() + prefix := getUniquePrefix() - b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s}, - }) - node1.SetBroker(b1) - _ = node1.Run() - defer func() { _ = node1.Shutdown(context.Background()) }() - defer stopRedisBroker(b1) + b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s}, + }) + node1.SetBroker(b1) + _ = node1.Run() + defer func() { _ = node1.Shutdown(context.Background()) }() + defer stopRedisBroker(b1) - node1.OnSurvey(func(event SurveyEvent, callback SurveyCallback) { - require.Nil(t, event.Data) - require.Equal(t, "test_op", event.Op) - callback(SurveyReply{ - Data: []byte("1"), - Code: 1, - }) - }) + node1.OnSurvey(func(event SurveyEvent, callback SurveyCallback) { + require.Nil(t, event.Data) + require.Equal(t, "test_op", event.Op) + callback(SurveyReply{ + Data: []byte("1"), + Code: 1, + }) + }) - node2, _ := New(Config{}) + node2, _ := New(Config{}) - s2, err := NewRedisShard(node2, redisConf) - require.NoError(t, err) - b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s2}, - }) - node2.SetBroker(b2) - _ = node2.Run() - defer func() { _ = node2.Shutdown(context.Background()) }() - defer stopRedisBroker(b2) + s2, err := NewRedisShard(node2, redisConf) + require.NoError(t, err) + b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s2}, + }) + node2.SetBroker(b2) + _ = node2.Run() + defer func() { _ = node2.Shutdown(context.Background()) }() + defer stopRedisBroker(b2) - node2.OnSurvey(func(event SurveyEvent, callback SurveyCallback) { - require.Nil(t, event.Data) - require.Equal(t, "test_op", event.Op) - callback(SurveyReply{ - Data: []byte("2"), - Code: 2, - }) - }) + node2.OnSurvey(func(event SurveyEvent, callback SurveyCallback) { + require.Nil(t, event.Data) + require.Equal(t, "test_op", event.Op) + callback(SurveyReply{ + Data: []byte("2"), + Code: 2, + }) + }) - waitAllNodes(t, node1, 2) + waitAllNodes(t, node1, 2) - results, err := node1.Survey(context.Background(), "test_op", nil, "") - require.NoError(t, err) - require.Len(t, results, 2) - res, ok := results[node1.ID()] - require.True(t, ok) - require.Equal(t, uint32(1), res.Code) - require.Equal(t, []byte("1"), res.Data) - res, ok = results[node2.ID()] - require.True(t, ok) - require.Equal(t, uint32(2), res.Code) - require.Equal(t, []byte("2"), res.Data) + results, err := node1.Survey(context.Background(), "test_op", nil, "") + require.NoError(t, err) + require.Len(t, results, 2) + res, ok := results[node1.ID()] + require.True(t, ok) + require.Equal(t, uint32(1), res.Code) + require.Equal(t, []byte("1"), res.Data) + res, ok = results[node2.ID()] + require.True(t, ok) + require.Equal(t, uint32(2), res.Code) + require.Equal(t, []byte("2"), res.Data) + }) + } } func TestNode_OnNotification_TwoNodes(t *testing.T) { - redisConf := testRedisConf() + for _, tt := range excludeNoHistoryClusterTests(noHistoryRedisTests) { + t.Run(tt.Name, func(t *testing.T) { + redisConf := testSingleRedisConf(tt.Port) - node1, _ := New(Config{}) + node1, _ := New(Config{}) - s, err := NewRedisShard(node1, redisConf) - require.NoError(t, err) + s, err := NewRedisShard(node1, redisConf) + require.NoError(t, err) - prefix := getUniquePrefix() + prefix := getUniquePrefix() - b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s}, - }) - node1.SetBroker(b1) - _ = node1.Run() - defer func() { _ = node1.Shutdown(context.Background()) }() - defer stopRedisBroker(b1) + b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s}, + }) + node1.SetBroker(b1) + _ = node1.Run() + defer func() { _ = node1.Shutdown(context.Background()) }() + defer stopRedisBroker(b1) - ch1 := make(chan struct{}) + ch1 := make(chan struct{}) - node1.OnNotification(func(event NotificationEvent) { - require.Equal(t, []byte(`notification`), event.Data) - require.Equal(t, "test_op", event.Op) - require.NotEmpty(t, event.FromNodeID) - require.Equal(t, node1.ID(), event.FromNodeID) - close(ch1) - }) + node1.OnNotification(func(event NotificationEvent) { + require.Equal(t, []byte(`notification`), event.Data) + require.Equal(t, "test_op", event.Op) + require.NotEmpty(t, event.FromNodeID) + require.Equal(t, node1.ID(), event.FromNodeID) + close(ch1) + }) - node2, _ := New(Config{}) + node2, _ := New(Config{}) - s2, err := NewRedisShard(node2, redisConf) - require.NoError(t, err) - b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s2}, - }) - node2.SetBroker(b2) - _ = node2.Run() - defer func() { _ = node2.Shutdown(context.Background()) }() - defer stopRedisBroker(b2) + s2, err := NewRedisShard(node2, redisConf) + require.NoError(t, err) + b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s2}, + }) + node2.SetBroker(b2) + _ = node2.Run() + defer func() { _ = node2.Shutdown(context.Background()) }() + defer stopRedisBroker(b2) - ch2 := make(chan struct{}) + ch2 := make(chan struct{}) - node2.OnNotification(func(event NotificationEvent) { - require.Equal(t, []byte(`notification`), event.Data) - require.Equal(t, "test_op", event.Op) - require.NotEqual(t, node2.ID(), event.FromNodeID) - close(ch2) - }) + node2.OnNotification(func(event NotificationEvent) { + require.Equal(t, []byte(`notification`), event.Data) + require.Equal(t, "test_op", event.Op) + require.NotEqual(t, node2.ID(), event.FromNodeID) + close(ch2) + }) - waitAllNodes(t, node1, 2) + waitAllNodes(t, node1, 2) - err = node1.Notify("test_op", []byte(`notification`), "") - require.NoError(t, err) - tm := time.After(5 * time.Second) - select { - case <-ch1: - select { - case <-ch2: - case <-tm: - t.Fatal("timeout on ch2") - } - case <-tm: - t.Fatal("timeout on ch1") + err = node1.Notify("test_op", []byte(`notification`), "") + require.NoError(t, err) + tm := time.After(5 * time.Second) + select { + case <-ch1: + select { + case <-ch2: + case <-tm: + t.Fatal("timeout on ch2") + } + case <-tm: + t.Fatal("timeout on ch1") + } + }) } } func TestRedisPubSubTwoNodes(t *testing.T) { - redisConf := testRedisConf() + for _, tt := range excludeNoHistoryClusterTests(noHistoryRedisTests) { + t.Run(tt.Name, func(t *testing.T) { + redisConf := testSingleRedisConf(tt.Port) - prefix := getUniquePrefix() + prefix := getUniquePrefix() - node1, _ := New(Config{}) - s, err := NewRedisShard(node1, redisConf) - require.NoError(t, err) - b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s}, - numPubSubSubscribers: 4, - numPubSubProcessors: 2, - }) - node1.SetBroker(b1) - defer func() { _ = node1.Shutdown(context.Background()) }() - defer stopRedisBroker(b1) + node1, _ := New(Config{}) + s, err := NewRedisShard(node1, redisConf) + require.NoError(t, err) + b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s}, + numPubSubSubscribers: 4, + numPubSubProcessors: 2, + }) + node1.SetBroker(b1) + defer func() { _ = node1.Shutdown(context.Background()) }() + defer stopRedisBroker(b1) - msgNum := 10 - var numPublications int64 - var numJoins int64 - var numLeaves int64 - pubCh := make(chan struct{}) - joinCh := make(chan struct{}) - leaveCh := make(chan struct{}) - brokerEventHandler := &testBrokerEventHandler{ - HandleControlFunc: func(bytes []byte) error { - return nil - }, - HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { - c := atomic.AddInt64(&numPublications, 1) - if c == int64(msgNum) { - close(pubCh) - } - return nil - }, - HandleJoinFunc: func(ch string, info *ClientInfo) error { - c := atomic.AddInt64(&numJoins, 1) - if c == int64(msgNum) { - close(joinCh) - } - return nil - }, - HandleLeaveFunc: func(ch string, info *ClientInfo) error { - c := atomic.AddInt64(&numLeaves, 1) - if c == int64(msgNum) { - close(leaveCh) + msgNum := 10 + var numPublications int64 + var numJoins int64 + var numLeaves int64 + pubCh := make(chan struct{}) + joinCh := make(chan struct{}) + leaveCh := make(chan struct{}) + brokerEventHandler := &testBrokerEventHandler{ + HandleControlFunc: func(bytes []byte) error { + return nil + }, + HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { + c := atomic.AddInt64(&numPublications, 1) + if c == int64(msgNum) { + close(pubCh) + } + return nil + }, + HandleJoinFunc: func(ch string, info *ClientInfo) error { + c := atomic.AddInt64(&numJoins, 1) + if c == int64(msgNum) { + close(joinCh) + } + return nil + }, + HandleLeaveFunc: func(ch string, info *ClientInfo) error { + c := atomic.AddInt64(&numLeaves, 1) + if c == int64(msgNum) { + close(leaveCh) + } + return nil + }, } - return nil - }, - } - _ = b1.Run(brokerEventHandler) + _ = b1.Run(brokerEventHandler) - for i := 0; i < msgNum; i++ { - require.NoError(t, b1.Subscribe("test"+strconv.Itoa(i))) - } + for i := 0; i < msgNum; i++ { + require.NoError(t, b1.Subscribe("test"+strconv.Itoa(i))) + } - node2, _ := New(Config{}) - s2, err := NewRedisShard(node2, redisConf) - require.NoError(t, err) + node2, _ := New(Config{}) + s2, err := NewRedisShard(node2, redisConf) + require.NoError(t, err) - b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s2}, - }) - node2.SetBroker(b2) - _ = node2.Run() - defer func() { _ = node2.Shutdown(context.Background()) }() - defer stopRedisBroker(b2) + b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s2}, + }) + node2.SetBroker(b2) + _ = node2.Run() + defer func() { _ = node2.Shutdown(context.Background()) }() + defer stopRedisBroker(b2) - for i := 0; i < msgNum; i++ { - _, err = node2.Publish("test"+strconv.Itoa(i), []byte("123")) - require.NoError(t, err) - err = b2.PublishJoin("test"+strconv.Itoa(i), &ClientInfo{}) - require.NoError(t, err) - err = b2.PublishLeave("test"+strconv.Itoa(i), &ClientInfo{}) - require.NoError(t, err) - } + for i := 0; i < msgNum; i++ { + _, err = node2.Publish("test"+strconv.Itoa(i), []byte("123")) + require.NoError(t, err) + err = b2.PublishJoin("test"+strconv.Itoa(i), &ClientInfo{}) + require.NoError(t, err) + err = b2.PublishLeave("test"+strconv.Itoa(i), &ClientInfo{}) + require.NoError(t, err) + } - select { - case <-pubCh: - case <-time.After(time.Second): - require.Fail(t, "timeout waiting for PUB/SUB message") - } - select { - case <-joinCh: - case <-time.After(time.Second): - require.Fail(t, "timeout waiting for PUB/SUB join message") - } - select { - case <-leaveCh: - case <-time.After(time.Second): - require.Fail(t, "timeout waiting for PUB/SUB leave message") + select { + case <-pubCh: + case <-time.After(time.Second): + require.Fail(t, "timeout waiting for PUB/SUB message") + } + select { + case <-joinCh: + case <-time.After(time.Second): + require.Fail(t, "timeout waiting for PUB/SUB join message") + } + select { + case <-leaveCh: + case <-time.After(time.Second): + require.Fail(t, "timeout waiting for PUB/SUB leave message") + } + }) } } @@ -1034,86 +1089,91 @@ type testDeltaPublishHandle struct { } func TestRedisPubSubTwoNodesWithDelta(t *testing.T) { - redisConf := testRedisConf() + // This is special because it actually uses history, but we re-use existing infrastructure. + for _, tt := range excludeNoHistoryClusterTests(noHistoryRedisTests) { + t.Run(tt.Name, func(t *testing.T) { + redisConf := testSingleRedisConf(tt.Port) - prefix := getUniquePrefix() + prefix := getUniquePrefix() - ch := "test" + uuid.NewString() + ch := "test" + uuid.NewString() - node1, _ := New(Config{}) - s, err := NewRedisShard(node1, redisConf) - require.NoError(t, err) - b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s}, - numPubSubSubscribers: 4, - numPubSubProcessors: 2, - }) - node1.SetBroker(b1) - defer func() { _ = node1.Shutdown(context.Background()) }() - defer stopRedisBroker(b1) + node1, _ := New(Config{}) + s, err := NewRedisShard(node1, redisConf) + require.NoError(t, err) + b1, _ := NewRedisBroker(node1, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s}, + numPubSubSubscribers: 4, + numPubSubProcessors: 2, + }) + node1.SetBroker(b1) + defer func() { _ = node1.Shutdown(context.Background()) }() + defer stopRedisBroker(b1) - msgNum := 2 - var numPublications int64 - pubCh := make(chan struct{}) - var resultsMu sync.Mutex - results := make([]testDeltaPublishHandle, 0, msgNum) + msgNum := 2 + var numPublications int64 + pubCh := make(chan struct{}) + var resultsMu sync.Mutex + results := make([]testDeltaPublishHandle, 0, msgNum) - brokerEventHandler := &testBrokerEventHandler{ - HandleControlFunc: func(bytes []byte) error { - return nil - }, - HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { - resultsMu.Lock() - defer resultsMu.Unlock() - results = append(results, testDeltaPublishHandle{ - ch: ch, - pub: pub, - sp: sp, - prevPub: prevPub, - }) - c := atomic.AddInt64(&numPublications, 1) - if c == int64(msgNum) { - close(pubCh) + brokerEventHandler := &testBrokerEventHandler{ + HandleControlFunc: func(bytes []byte) error { + return nil + }, + HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error { + resultsMu.Lock() + defer resultsMu.Unlock() + results = append(results, testDeltaPublishHandle{ + ch: ch, + pub: pub, + sp: sp, + prevPub: prevPub, + }) + c := atomic.AddInt64(&numPublications, 1) + if c == int64(msgNum) { + close(pubCh) + } + return nil + }, } - return nil - }, - } - _ = b1.Run(brokerEventHandler) + _ = b1.Run(brokerEventHandler) - require.NoError(t, b1.Subscribe(ch)) + require.NoError(t, b1.Subscribe(ch)) - node2, _ := New(Config{}) - s2, err := NewRedisShard(node2, redisConf) - require.NoError(t, err) + node2, _ := New(Config{}) + s2, err := NewRedisShard(node2, redisConf) + require.NoError(t, err) - b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ - Prefix: prefix, - Shards: []*RedisShard{s2}, - }) - node2.SetBroker(b2) - _ = node2.Run() - defer func() { _ = node2.Shutdown(context.Background()) }() - defer stopRedisBroker(b2) + b2, _ := NewRedisBroker(node2, RedisBrokerConfig{ + Prefix: prefix, + Shards: []*RedisShard{s2}, + }) + node2.SetBroker(b2) + _ = node2.Run() + defer func() { _ = node2.Shutdown(context.Background()) }() + defer stopRedisBroker(b2) - for i := 0; i < msgNum; i++ { - sp, err := node2.Publish(ch, []byte("123"), - WithHistory(1, time.Minute), WithDelta(true)) - require.NoError(t, err) - require.Equal(t, sp.Offset, uint64(i+1)) - } + for i := 0; i < msgNum; i++ { + sp, err := node2.Publish(ch, []byte("123"), + WithHistory(1, time.Minute), WithDelta(true)) + require.NoError(t, err) + require.Equal(t, sp.Offset, uint64(i+1)) + } - select { - case <-pubCh: - case <-time.After(time.Second): - require.Fail(t, "timeout waiting for PUB/SUB message") - } + select { + case <-pubCh: + case <-time.After(time.Second): + require.Fail(t, "timeout waiting for PUB/SUB message") + } - resultsMu.Lock() - defer resultsMu.Unlock() - require.Len(t, results, msgNum) - require.Nil(t, results[0].prevPub) - require.NotNil(t, results[1].prevPub) + resultsMu.Lock() + defer resultsMu.Unlock() + require.Len(t, results, msgNum) + require.Nil(t, results[0].prevPub) + require.NotNil(t, results[1].prevPub) + }) + } } func TestRedisClusterShardedPubSub(t *testing.T) { @@ -1284,7 +1344,7 @@ func BenchmarkRedisSurvey(b *testing.B) { for _, tt := range benchSurveyTests { b.Run(tt.Name, func(b *testing.B) { prefix := getUniquePrefix() - redisConf := testRedisConf() + redisConf := testSingleRedisConf(6379) data := make([]byte, tt.DataSize) var nodes []*Node @@ -1369,10 +1429,10 @@ func BenchmarkRedisIndex(b *testing.B) { } func BenchmarkRedisPublish_1Ch(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range noHistoryBenchRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, false, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) rawData := []byte(`{"bench": true}`) @@ -1393,10 +1453,10 @@ func BenchmarkRedisPublish_1Ch(b *testing.B) { const benchmarkNumDifferentChannels = 1024 func BenchmarkRedisPublish_ManyCh(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range noHistoryRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, false, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) rawData := []byte(`{"bench": true}`) @@ -1418,10 +1478,10 @@ func BenchmarkRedisPublish_ManyCh(b *testing.B) { } func BenchmarkRedisPublish_History_1Ch(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range historyBenchRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) rawData := []byte(`{"bench": true}`) @@ -1445,10 +1505,10 @@ func BenchmarkRedisPublish_History_1Ch(b *testing.B) { } func BenchmarkRedisPub_History_ManyCh(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range historyBenchRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) rawData := []byte(`{"bench": true}`) @@ -1475,21 +1535,10 @@ func BenchmarkRedisPub_History_ManyCh(b *testing.B) { } func BenchmarkRedisSubscribe(b *testing.B) { - type test struct { - Name string - UseCluster bool - } - tests := []test{ - {"no_cluster", false}, - } - if os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS") != "" { - tests = append(tests, test{"in_cluster", true}) - } - - for _, tt := range tests { + for _, tt := range noHistoryRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, false, tt.UseCluster) + broker := newTestRedisBroker(b, node, false, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) i := int32(0) @@ -1509,10 +1558,10 @@ func BenchmarkRedisSubscribe(b *testing.B) { } func BenchmarkRedisHistory_1Ch(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range historyBenchRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) rawData := []byte("{}") @@ -1535,10 +1584,10 @@ func BenchmarkRedisHistory_1Ch(b *testing.B) { } func BenchmarkRedisRecover_1Ch(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range historyBenchRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) rawData := []byte("{}") @@ -1570,12 +1619,12 @@ func BenchmarkRedisRecover_1Ch(b *testing.B) { } } -func nodeWithRedisBroker(tb testing.TB, useStreams bool, useCluster bool) *Node { +func nodeWithRedisBroker(tb testing.TB, useStreams bool, useCluster bool, port int) *Node { n, err := New(Config{}) if err != nil { tb.Fatal(err) } - newTestRedisBroker(tb, n, useStreams, useCluster) + newTestRedisBroker(tb, n, useStreams, useCluster, port) n.OnConnect(func(client *Client) { client.OnSubscribe(func(e SubscribeEvent, cb SubscribeCallback) { cb(SubscribeReply{}, nil) @@ -1587,20 +1636,20 @@ func nodeWithRedisBroker(tb testing.TB, useStreams bool, useCluster bool) *Node return n } -func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bool, useCluster bool) { - node := nodeWithRedisBroker(t, useStreams, useCluster) - node.config.RecoveryMaxPublicationLimit = tt.Limit +func testRedisClientSubscribeRecover(t *testing.T, tt historyRedisTest, rt recoverTest) { + node := nodeWithRedisBroker(t, tt.UseStreams, tt.UseCluster, tt.Port) + node.config.RecoveryMaxPublicationLimit = rt.Limit defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(node.broker.(*RedisBroker)) channel := "test_recovery_redis_" + tt.Name - for i := 1; i <= tt.NumPublications; i++ { - _, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), WithHistory(tt.HistorySize, time.Duration(tt.HistoryTTLSeconds)*time.Second)) + for i := 1; i <= rt.NumPublications; i++ { + _, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), WithHistory(rt.HistorySize, time.Duration(rt.HistoryTTLSeconds)*time.Second)) require.NoError(t, err) } - time.Sleep(time.Duration(tt.Sleep) * time.Second) + time.Sleep(time.Duration(rt.Sleep) * time.Second) _, streamTop, err := node.broker.History(channel, HistoryOptions{ Filter: HistoryFilter{ @@ -1610,11 +1659,11 @@ func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bo }) require.NoError(t, err) - historyResult, err := node.recoverHistory(channel, StreamPosition{tt.SinceOffset, streamTop.Epoch}, 0) + historyResult, err := node.recoverHistory(channel, StreamPosition{rt.SinceOffset, streamTop.Epoch}, 0) require.NoError(t, err) - recoveredPubs, recovered := isStreamRecovered(historyResult, tt.SinceOffset, streamTop.Epoch) - require.Equal(t, tt.NumRecovered, len(recoveredPubs)) - require.Equal(t, tt.Recovered, recovered) + recoveredPubs, recovered := isStreamRecovered(historyResult, rt.SinceOffset, streamTop.Epoch) + require.Equal(t, rt.NumRecovered, len(recoveredPubs)) + require.Equal(t, rt.Recovered, recovered) } var brokerRecoverTests = []recoverTest{ @@ -1630,43 +1679,21 @@ var brokerRecoverTests = []recoverTest{ {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream}, } -func TestRedisClientSubscribeRecoverStreams(t *testing.T) { - for _, tt := range brokerRecoverTests { - t.Run(tt.Name, func(t *testing.T) { - testRedisClientSubscribeRecover(t, tt, true, false) - }) - } -} - -func TestRedisClientSubscribeRecoverLists(t *testing.T) { - for _, tt := range brokerRecoverTests { - t.Run(tt.Name, func(t *testing.T) { - testRedisClientSubscribeRecover(t, tt, false, false) - }) - } -} - -func TestRedisClientSubscribeRecoverStreamsCluster(t *testing.T) { - for _, tt := range brokerRecoverTests { - t.Run(tt.Name, func(t *testing.T) { - testRedisClientSubscribeRecover(t, tt, true, true) - }) - } -} - -func TestRedisClientSubscribeRecoverListsCluster(t *testing.T) { - for _, tt := range brokerRecoverTests { - t.Run(tt.Name, func(t *testing.T) { - testRedisClientSubscribeRecover(t, tt, false, true) - }) +func TestRedisClientSubscribeRecover(t *testing.T) { + for _, tt := range historyRedisTests { + for _, rt := range brokerRecoverTests { + t.Run(tt.Name+"_"+rt.Name, func(t *testing.T) { + testRedisClientSubscribeRecover(t, tt, rt) + }) + } } } func TestRedisHistoryIteration(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) it := historyIterationTest{100, 5} @@ -1677,10 +1704,10 @@ func TestRedisHistoryIteration(t *testing.T) { } func TestRedisHistoryReversedNoMetaYet(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) pubs, sp, err := broker.History( @@ -1709,13 +1736,13 @@ func TestRedisHistoryReversedNoMetaYet(t *testing.T) { } func TestRedisHistoryIterationReverse(t *testing.T) { - for _, tt := range redisTests { + for _, tt := range historyRedisTests { t.Run(tt.Name, func(t *testing.T) { if !tt.UseStreams || tt.UseCluster { t.Skip() } node := testNode(t) - broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) it := historyIterationTest{100, 5} @@ -1726,10 +1753,10 @@ func TestRedisHistoryIterationReverse(t *testing.T) { } func BenchmarkRedisHistoryIteration(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range historyBenchRedisTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster) + broker := newTestRedisBroker(b, node, tt.UseStreams, tt.UseCluster, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisBroker(broker) it := historyIterationTest{10000, 100} @@ -1746,18 +1773,22 @@ type throughputTest struct { NumPubSubShards int NumPubSubSubscribers int NumPubSubProcessors int + Port int } var throughputTests = []throughputTest{ - {1, 0, 0}, - {2, 0, 0}, - {4, 0, 0}, + {1, 0, 0, 6379}, + {2, 0, 0, 6379}, + {4, 0, 0, 6379}, + {1, 0, 0, 7379}, + {2, 0, 0, 7379}, + {4, 0, 0, 7379}, } func BenchmarkPubSubThroughput(b *testing.B) { for _, tt := range throughputTests { - b.Run(fmt.Sprintf("%dsh_%dsub_%dproc", tt.NumPubSubShards, tt.NumPubSubSubscribers, tt.NumPubSubProcessors), func(b *testing.B) { - redisConf := testRedisConf() + b.Run(fmt.Sprintf("%dsh_%dsub_%dproc_%d", tt.NumPubSubShards, tt.NumPubSubSubscribers, tt.NumPubSubProcessors, tt.Port), func(b *testing.B) { + redisConf := testSingleRedisConf(tt.Port) node1, _ := New(Config{}) defer func() { _ = node1.Shutdown(context.Background()) }() diff --git a/docker-compose.yml b/docker-compose.yml index c0578ee3..bf96ae7f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,10 @@ services: image: redis:${REDIS_VERSION:-7}-alpine ports: - "6379:6379" + dragonflydb: + image: docker.dragonflydb.io/dragonflydb/dragonfly:v1.18.1 + ports: + - "7379:6379" sentinel: image: redis:${REDIS_VERSION:-7}-alpine entrypoint: diff --git a/emulation_integration_test.go b/emulation_integration_test.go index 66d6d9ae..7cdc19b0 100644 --- a/emulation_integration_test.go +++ b/emulation_integration_test.go @@ -35,7 +35,7 @@ func TestEmulation_DifferentNodes(t *testing.T) { }) }) - redisConf := testRedisConf() + redisConf := testSingleRedisConf() s, err := NewRedisShard(n1, redisConf) require.NoError(t, err) diff --git a/presence_redis_test.go b/presence_redis_test.go index cf916086..afcacbb0 100644 --- a/presence_redis_test.go +++ b/presence_redis_test.go @@ -13,11 +13,11 @@ import ( "github.com/stretchr/testify/require" ) -func newTestRedisPresenceManager(tb testing.TB, n *Node, useCluster bool, userMapping bool) *RedisPresenceManager { +func newTestRedisPresenceManager(tb testing.TB, n *Node, useCluster bool, userMapping bool, port int) *RedisPresenceManager { if useCluster { return NewTestRedisPresenceManagerClusterWithPrefix(tb, n, getUniquePrefix(), userMapping) } - return NewTestRedisPresenceManagerWithPrefix(tb, n, getUniquePrefix(), userMapping) + return NewTestRedisPresenceManagerWithPrefix(tb, n, getUniquePrefix(), userMapping, port) } func stopRedisPresenceManager(pm *RedisPresenceManager) { @@ -26,8 +26,8 @@ func stopRedisPresenceManager(pm *RedisPresenceManager) { } } -func NewTestRedisPresenceManagerWithPrefix(tb testing.TB, n *Node, prefix string, userMapping bool) *RedisPresenceManager { - redisConf := testRedisConf() +func NewTestRedisPresenceManagerWithPrefix(tb testing.TB, n *Node, prefix string, userMapping bool, port int) *RedisPresenceManager { + redisConf := testSingleRedisConf(port) s, err := NewRedisShard(n, redisConf) require.NoError(tb, err) pm, err := NewRedisPresenceManager(n, RedisPresenceManagerConfig{ @@ -73,19 +73,34 @@ func NewTestRedisPresenceManagerClusterWithPrefix(tb testing.TB, n *Node, prefix return pm } -var redisPresenceTests = []struct { +type redisPresenceTest struct { Name string UseCluster bool -}{ - {"with_cluster", true}, - {"without_cluster", false}, + Port int +} + +var redisPresenceTests = []redisPresenceTest{ + {"rd_single", false, 6379}, + {"df_single", false, 7379}, + {"rd_cluster", true, 0}, +} + +func excludeClusterPresenceTests(tests []redisPresenceTest) []redisPresenceTest { + var res []redisPresenceTest + for _, t := range tests { + if t.UseCluster { + continue + } + res = append(res, t) + } + return res } func TestRedisPresenceManager(t *testing.T) { for _, tt := range redisPresenceTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - pm := newTestRedisPresenceManager(t, node, tt.UseCluster, false) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) @@ -115,7 +130,7 @@ func TestRedisPresenceManagerWithUserMapping(t *testing.T) { for _, tt := range redisPresenceTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) @@ -191,7 +206,7 @@ func TestRedisPresenceManagerWithUserMappingExpire(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() node := testNode(t) - pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true, tt.Port) pm.config.PresenceTTL = 2 * time.Second defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) @@ -258,10 +273,10 @@ func TestRedisPresenceManagerWithUserMappingExpire(t *testing.T) { } func BenchmarkRedisAddPresence_1Ch(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -279,10 +294,10 @@ func BenchmarkRedisAddPresence_1Ch(b *testing.B) { } func BenchmarkRedisAddPresence_ManyCh(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -303,10 +318,10 @@ func BenchmarkRedisAddPresence_ManyCh(b *testing.B) { } func BenchmarkRedisPresence_1Ch(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -325,10 +340,10 @@ func BenchmarkRedisPresence_1Ch(b *testing.B) { } func BenchmarkRedisPresence_ManyCh(b *testing.B) { - for _, tt := range benchRedisTests { + for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -350,44 +365,48 @@ func BenchmarkRedisPresence_ManyCh(b *testing.B) { } func BenchmarkRedisPresenceStatsWithMapping(b *testing.B) { - node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, false, true) - defer func() { _ = node.Shutdown(context.Background()) }() - defer stopRedisPresenceManager(pm) - b.SetParallelism(getBenchParallelism()) - - sem := make(chan struct{}, 100) - numClients := 100_000 - - var wg sync.WaitGroup - wg.Add(numClients) - for i := 0; i < numClients; i++ { - sem <- struct{}{} - i := i - go func() { - defer wg.Done() - defer func() { - <-sem - }() - clientID := "uid" + strconv.Itoa(i) - userID := "user" + strconv.Itoa(i) - _ = pm.AddPresence("channel", "uid"+strconv.Itoa(i), &ClientInfo{ - ClientID: clientID, - UserID: userID, - }) - }() - } - wg.Wait() + for _, tt := range excludeClusterPresenceTests(redisPresenceTests) { + b.Run(tt.Name, func(b *testing.B) { + node := benchNode(b) + pm := newTestRedisPresenceManager(b, node, false, true, tt.Port) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisPresenceManager(pm) + b.SetParallelism(getBenchParallelism()) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - s, err := pm.PresenceStats("channel") - if err != nil { - b.Fatal(err) + sem := make(chan struct{}, 100) + numClients := 100_000 + + var wg sync.WaitGroup + wg.Add(numClients) + for i := 0; i < numClients; i++ { + sem <- struct{}{} + i := i + go func() { + defer wg.Done() + defer func() { + <-sem + }() + clientID := "uid" + strconv.Itoa(i) + userID := "user" + strconv.Itoa(i) + _ = pm.AddPresence("channel", "uid"+strconv.Itoa(i), &ClientInfo{ + ClientID: clientID, + UserID: userID, + }) + }() } - require.Equal(b, s.NumClients, numClients) - require.Equal(b, s.NumUsers, numClients) - } - }) + wg.Wait() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + s, err := pm.PresenceStats("channel") + if err != nil { + b.Fatal(err) + } + require.Equal(b, s.NumClients, numClients) + require.Equal(b, s.NumUsers, numClients) + } + }) + }) + } }