Skip to content

Commit

Permalink
feat(telemetry): add discovered_gateways_count (#3783)
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Mar 21, 2023
1 parent 1c19ee7 commit 9efdffe
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 39 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ Adding a new version? You'll need three changes:
- Konnect Runtime Group's nodes are reactively updated on each discovered Gateway clients
change.
[#3727](https://github.com/Kong/kubernetes-ingress-controller/pull/3727)
- Telemetry reports now include a number of discovered Gateways when the Gateway Discovery
feature is turned on.
[#3783](https://github.com/Kong/kubernetes-ingress-controller/pull/3783)

### Fixed

Expand Down
6 changes: 6 additions & 0 deletions internal/dataplane/clients_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client {
return copied
}

func (c *AdminAPIClientsManager) GatewayClientsCount() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.gatewayClients)
}

// SubscribeToGatewayClientsChanges returns a channel that will receive a notification on every Gateway clients update.
// Can be used to receive a signal when immediate reaction to the changes is needed. After receiving the notification,
// GatewayClients call will return an already updated slice of clients.
Expand Down
2 changes: 2 additions & 0 deletions internal/dataplane/clients_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) {
)
require.NoError(t, err)
require.Len(t, m.GatewayClients(), 1, "expecting one initial client")
require.Equal(t, m.GatewayClientsCount(), 1, "expecting one initial client")
require.Len(t, m.AllClients(), 1, "expecting one initial client")

konnectTestClient, err := adminapi.NewTestClient("https://us.api.konghq.tech")
require.NoError(t, err)
m.SetKonnectClient(konnectTestClient)
require.Len(t, m.GatewayClients(), 1, "konnect client should not be returned from GatewayClients")
require.Equal(t, m.GatewayClientsCount(), 1, "konnect client should not be counted in GatewayClientsCount")
require.Len(t, m.AllClients(), 2, "konnect client should be returned from AllClients")
}

Expand Down
14 changes: 12 additions & 2 deletions internal/manager/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/telemetry/workflows"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)

Expand All @@ -41,7 +42,7 @@ type ReportValues struct {
}

// CreateManager creates telemetry manager using the provided rest.Config.
func CreateManager(restConfig *rest.Config, fixedPayload Payload, rv ReportValues) (telemetry.Manager, error) {
func CreateManager(restConfig *rest.Config, gatewaysCounter workflows.DiscoveredGatewaysCounter, fixedPayload Payload, rv ReportValues) (telemetry.Manager, error) {
logger := logrusr.New(logrus.New())

k, err := kubernetes.NewForConfig(restConfig)
Expand All @@ -54,7 +55,7 @@ func CreateManager(restConfig *rest.Config, fixedPayload Payload, rv ReportValue
}
dyn := dynamic.New(k.Discovery().RESTClient())

m, err := createManager(k, dyn, cl, fixedPayload, rv,
m, err := createManager(k, dyn, cl, gatewaysCounter, fixedPayload, rv,
telemetry.OptManagerPeriod(telemetryPeriod),
telemetry.OptManagerLogger(logger),
)
Expand All @@ -79,6 +80,7 @@ func createManager(
k kubernetes.Interface,
dyn dynamic.Interface,
cl client.Client,
gatewaysCounter workflows.DiscoveredGatewaysCounter,
fixedPayload Payload,
rv ReportValues,
opts ...telemetry.OptManager,
Expand Down Expand Up @@ -164,6 +166,14 @@ func createManager(
m.AddWorkflow(w)
}

if rv.GatewayServiceDiscoveryEnabled {
w, err := workflows.NewGatewayDiscoveryWorkflow(gatewaysCounter)
if err != nil {
return nil, fmt.Errorf("failed to create gateway discovery workflow: %w", err)
}
m.AddWorkflow(w)
}

return m, nil
}

Expand Down
168 changes: 134 additions & 34 deletions internal/manager/telemetry/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

type mockGatewaysCounter int

func (m mockGatewaysCounter) GatewayClientsCount() int {
return int(m)
}

func TestCreateManager(t *testing.T) {
var (
payload = types.ProviderReport{
Expand All @@ -37,7 +43,6 @@ func TestCreateManager(t *testing.T) {
"gateway": true,
"knative": false,
}
ctx = context.Background()
publishService = apitypes.NamespacedName{
Namespace: "kong",
Name: "kong-proxy",
Expand Down Expand Up @@ -82,18 +87,138 @@ func TestCreateManager(t *testing.T) {
Platform: "linux/amd64",
}

mgr, err := createManager(
reportValues := ReportValues{
FeatureGates: featureGates,
MeshDetection: true,
PublishServiceNN: publishService,
KonnectSyncEnabled: true,
GatewayServiceDiscoveryEnabled: true,
}

runManagerTest(
t,
k8sclient,
dyn,
ctrlClient,
mockGatewaysCounter(5),
payload,
ReportValues{
FeatureGates: featureGates,
MeshDetection: true,
PublishServiceNN: publishService,
KonnectSyncEnabled: true,
GatewayServiceDiscoveryEnabled: true,
reportValues,
func(t *testing.T, actualReport string) {
require.Equal(t,
fmt.Sprintf(
"<14>"+
"signal=test-signal;"+
"db=off;"+
"feature-gateway-service-discovery=true;"+
"feature-gateway=true;"+
"feature-knative=false;"+
"feature-konnect-sync=true;"+
"hn=%s;"+
"kv=3.1.1;"+
"uptime=0;"+
"discovered_gateways_count=5;"+
"k8s_arch=linux/amd64;"+
"k8s_provider=UNKNOWN;"+
"k8sv=v1.24.5;"+
"k8sv_semver=v1.24.5;"+
"k8s_nodes_count=4;"+
"k8s_pods_count=8;"+
"k8s_services_count=17;"+
"kinm=c3,l2,l3,l4;"+
"mdep=i3,k3,km3,l3,t3;"+
"mdist=all17,c1,i2,k1,km1,l2,t1;"+
"\n",
hostname),
actualReport,
)
},
)
}

func TestCreateManager_GatewayDiscoverySpecifics(t *testing.T) {
testCases := []struct {
name string
gatewayServiceDiscoveryEnabled bool
expectReportToContain []string
expectReportToNotContain []string
}{
{
name: "gateway service discovery disabled",
gatewayServiceDiscoveryEnabled: false,
expectReportToContain: []string{
"feature-gateway-service-discovery=false",
},
expectReportToNotContain: []string{
"discovered_gateways_count=",
},
},
{
name: "gateway service discovery enabled",
gatewayServiceDiscoveryEnabled: true,
expectReportToContain: []string{
"feature-gateway-service-discovery=true",
"discovered_gateways_count=5",
},
},
}

scheme := prepareScheme(t)
dyn := testdynclient.NewSimpleDynamicClient(scheme)
ctrlClient := fakeclient.NewClientBuilder().Build()
k8sclient := testk8sclient.NewSimpleClientset()

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

runManagerTest(
t,
k8sclient,
dyn,
ctrlClient,
mockGatewaysCounter(5),
Payload{},
ReportValues{
GatewayServiceDiscoveryEnabled: tc.gatewayServiceDiscoveryEnabled,
},
func(t *testing.T, actualReport string) {
for _, expected := range tc.expectReportToContain {
require.Contains(t, actualReport, expected)
}
for _, expected := range tc.expectReportToNotContain {
require.NotContains(t, actualReport, expected)
}
})
})
}
}

// runManagerTest is a helper function that creates a manager with the dependencies provided as arguments, and
// calls the testFn with the actual report string it receives after triggering `test-signal` execution.
func runManagerTest(
t *testing.T,

// Following arguments map with the arguments of the createManager function, but instead of interfaces,
// concrete test types are used.
k8sclient *testk8sclient.Clientset,
dyn *testdynclient.FakeDynamicClient,
ctrlClient client.Client,
gatewaysCounter mockGatewaysCounter,
payload Payload,
reportValues ReportValues,

// testFn is a function that will be called with the actual report string.
testFn func(t *testing.T, actualReport string),
) {
ctx := context.Background()
mgr, err := createManager(
k8sclient,
dyn,
ctrlClient,
gatewaysCounter,
payload,
reportValues,
telemetry.OptManagerPeriod(time.Hour),
telemetry.OptManagerLogger(logr.Discard()),
)
Expand All @@ -112,32 +237,7 @@ func TestCreateManager(t *testing.T) {
require.NoError(t, mgr.TriggerExecute(ctx, "test-signal"))
select {
case b := <-ch:
require.Equal(t,
fmt.Sprintf(
"<14>"+
"signal=test-signal;"+
"db=off;"+
"feature-gateway-service-discovery=true;"+
"feature-gateway=true;"+
"feature-knative=false;"+
"feature-konnect-sync=true;"+
"hn=%s;"+
"kv=3.1.1;"+
"uptime=0;"+
"k8s_arch=linux/amd64;"+
"k8s_provider=UNKNOWN;"+
"k8sv=v1.24.5;"+
"k8sv_semver=v1.24.5;"+
"k8s_nodes_count=4;"+
"k8s_pods_count=8;"+
"k8s_services_count=17;"+
"kinm=c3,l2,l3,l4;"+
"mdep=i3,k3,km3,l3,t3;"+
"mdist=all17,c1,i2,k1,km1,l2,t1;"+
"\n",
hostname),
string(b),
)
testFn(t, string(b))
case <-time.After(time.Second):
t.Fatal("we should get a report but we didn't")
}
Expand Down
12 changes: 9 additions & 3 deletions internal/manager/telemetry/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ import (
"github.com/google/uuid"
"k8s.io/client-go/rest"

"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi"
"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/metadata"
)

// GatewayClientsProvider is an interface that provides clients for the currently discovered Gateway instances.
type GatewayClientsProvider interface {
GatewayClients() []*adminapi.Client
GatewayClientsCount() int
}

// SetupAnonymousReports sets up and starts the anonymous reporting and returns
// a cleanup function and an error.
// The caller is responsible to call the returned function - when the returned
// error is not nil - to stop the reports sending.
func SetupAnonymousReports(
ctx context.Context,
kubeCfg *rest.Config,
clientsProvider dataplane.AdminAPIClientsProvider,
clientsProvider GatewayClientsProvider,
rv ReportValues,
) (func(), error) {
// if anonymous reports are enabled this helps provide Kong with insights about usage of the ingress controller
Expand Down Expand Up @@ -61,7 +67,7 @@ func SetupAnonymousReports(
"id": uuid.NewString(), // universal unique identifier for this system
}

tMgr, err := CreateManager(kubeCfg, fixedPayload, rv)
tMgr, err := CreateManager(kubeCfg, clientsProvider, fixedPayload, rv)
if err != nil {
return nil, fmt.Errorf("failed to create anonymous reports manager: %w", err)
}
Expand Down
63 changes: 63 additions & 0 deletions internal/manager/telemetry/workflows/gateway_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package workflows

import (
"context"
"errors"
"fmt"

"github.com/kong/kubernetes-telemetry/pkg/provider"
"github.com/kong/kubernetes-telemetry/pkg/telemetry"
"github.com/kong/kubernetes-telemetry/pkg/types"
)

const GatewayDiscoveryWorkflowName = "gateway_discovery"

// DiscoveredGatewaysCounter is an interface that allows to count currently discovered Gateways.
type DiscoveredGatewaysCounter interface {
GatewayClientsCount() int
}

func NewGatewayDiscoveryWorkflow(gatewaysCounter DiscoveredGatewaysCounter) (telemetry.Workflow, error) {
w := telemetry.NewWorkflow(GatewayDiscoveryWorkflowName)

discoveredGatewaysCountProvider, err := NewDiscoveredGatewaysCountProvider(gatewaysCounter)
if err != nil {
return nil, fmt.Errorf("failed to create discovered gateways count provider: %w", err)
}
w.AddProvider(discoveredGatewaysCountProvider)

return w, nil
}

// DiscoveredGatewaysCountProvider is a provider that reports the number of currently discovered Gateways.
type DiscoveredGatewaysCountProvider struct {
counter DiscoveredGatewaysCounter
}

func NewDiscoveredGatewaysCountProvider(counter DiscoveredGatewaysCounter) (*DiscoveredGatewaysCountProvider, error) {
if counter == nil {
return nil, errors.New("discovered gateways counter is required")
}

return &DiscoveredGatewaysCountProvider{counter: counter}, nil
}

const (
DiscoveredGatewaysCountProviderName = "discovered_gateways_count"
DiscoveredGatewaysCountProviderKind = provider.Kind(DiscoveredGatewaysCountProviderName)
DiscoveredGatewaysCountKey = types.ProviderReportKey(DiscoveredGatewaysCountProviderName)
)

func (d *DiscoveredGatewaysCountProvider) Name() string {
return DiscoveredGatewaysCountProviderName
}

func (d *DiscoveredGatewaysCountProvider) Kind() provider.Kind {
return DiscoveredGatewaysCountProviderKind
}

func (d *DiscoveredGatewaysCountProvider) Provide(context.Context) (types.ProviderReport, error) {
return types.ProviderReport{
DiscoveredGatewaysCountKey: d.counter.GatewayClientsCount(),
}, nil
}

0 comments on commit 9efdffe

Please sign in to comment.