Skip to content

Commit

Permalink
Add TestStreamIsActive
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Wislang <aaron.wislang@microsoft.com>
  • Loading branch information
asw101 committed Jan 25, 2022
1 parent b292728 commit 66c08d6
Showing 1 changed file with 138 additions and 2 deletions.
140 changes: 138 additions & 2 deletions scaler/handlers_test.go
Expand Up @@ -3,6 +3,8 @@ package main
import (
context "context"
"fmt"
"log"
"net"
"testing"
"time"

Expand All @@ -11,6 +13,8 @@ import (
"github.com/kedacore/http-add-on/pkg/routing"
externalscaler "github.com/kedacore/http-add-on/proto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
)

func standardTarget() routing.Target {
Expand All @@ -22,7 +26,8 @@ func standardTarget() routing.Target {
123,
)
}
func TestIsActive(t *testing.T) {

func TestStreamIsActive(t *testing.T) {
type testCase struct {
name string
host string
Expand Down Expand Up @@ -88,9 +93,140 @@ func TestIsActive(t *testing.T) {
lggr := logr.Discard()
table := routing.NewTable()
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
tc.setup(table, pinger)

hdl := newImpl(
lggr,
pinger,
table,
123,
200,
)

bufSize := 1024 * 1024
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer()
externalscaler.RegisterExternalScalerServer(
grpcServer,
hdl,
)

go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()

bufDialFunc := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}

conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialFunc), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := externalscaler.NewExternalScalerClient(conn)

testRef := &externalscaler.ScaledObjectRef{
ScalerMetadata: map[string]string{
"host": tc.host,
},
}

// First will see if we can establish the stream and handle this
// error.
streamClient, err := client.StreamIsActive(ctx, testRef)
if err != nil {
t.Fatalf("failed to create queue pinger: %v", err)
t.Fatalf("StreamIsActive failed: %v", err)
}

// Next, as in TestIsActive, we check for any error, expected
// or unexpected, for each table test.
res, err := streamClient.Recv()

if tc.expectedErr && err != nil {
return
} else if err != nil {
t.Fatalf("expected no error but got: %v", err)
}

if tc.expected != res.Result {
t.Fatalf("Expected IsActive result %v, got: %v", tc.expected, res.Result)
}
})
}
}

func TestIsActive(t *testing.T) {
type testCase struct {
name string
host string
expected bool
expectedErr bool
setup func(*routing.Table, *queuePinger)
}

testCases := []testCase{
{
name: "Simple host inactive",
host: t.Name(),
expected: false,
expectedErr: false,
setup: func(table *routing.Table, q *queuePinger) {
table.AddTarget(t.Name(), standardTarget())
q.pingMut.Lock()
defer q.pingMut.Unlock()
q.allCounts[t.Name()] = 0
},
},
{
name: "Host is 'interceptor'",
host: "interceptor",
expected: true,
expectedErr: false,
setup: func(*routing.Table, *queuePinger) {},
},
{
name: "Simple host active",
host: t.Name(),
expected: true,
expectedErr: false,
setup: func(table *routing.Table, q *queuePinger) {
table.AddTarget(t.Name(), standardTarget())
q.pingMut.Lock()
defer q.pingMut.Unlock()
q.allCounts[t.Name()] = 1
},
},
{
name: "No host present, but host in routing table",
host: t.Name(),
expected: false,
expectedErr: false,
setup: func(table *routing.Table, q *queuePinger) {
table.AddTarget(t.Name(), standardTarget())
},
},
{
name: "Host doesn't exist",
host: t.Name(),
expected: false,
expectedErr: true,
setup: func(*routing.Table, *queuePinger) {},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r := require.New(t)
ctx := context.Background()
lggr := logr.Discard()
table := routing.NewTable()
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
tc.setup(table, pinger)
Expand Down

0 comments on commit 66c08d6

Please sign in to comment.