Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): add discovered_gateways_count #3783

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ Adding a new version? You'll need three changes:
- Konnect Runtime Group's nodes are reactively updated on each discovered Gateway clients
change.
[#3727](https://github.com/Kong/kubernetes-ingress-controller/pull/3727)
- Telemetry reports now include a number of discovered Gateways when the Gateway Discovery
feature is turned on.
[#3783](https://github.com/Kong/kubernetes-ingress-controller/pull/3783)

### Fixed

Expand Down
6 changes: 6 additions & 0 deletions internal/dataplane/clients_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client {
return copied
}

func (c *AdminAPIClientsManager) GatewayClientsCount() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.gatewayClients)
}

// SubscribeToGatewayClientsChanges returns a channel that will receive a notification on every Gateway clients update.
// Can be used to receive a signal when immediate reaction to the changes is needed. After receiving the notification,
// GatewayClients call will return an already updated slice of clients.
Expand Down
2 changes: 2 additions & 0 deletions internal/dataplane/clients_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) {
)
require.NoError(t, err)
require.Len(t, m.GatewayClients(), 1, "expecting one initial client")
require.Equal(t, m.GatewayClientsCount(), 1, "expecting one initial client")
require.Len(t, m.AllClients(), 1, "expecting one initial client")

konnectTestClient, err := adminapi.NewTestClient("https://us.api.konghq.tech")
require.NoError(t, err)
m.SetKonnectClient(konnectTestClient)
require.Len(t, m.GatewayClients(), 1, "konnect client should not be returned from GatewayClients")
require.Equal(t, m.GatewayClientsCount(), 1, "konnect client should not be counted in GatewayClientsCount")
require.Len(t, m.AllClients(), 2, "konnect client should be returned from AllClients")
}

Expand Down
14 changes: 12 additions & 2 deletions internal/manager/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/telemetry/workflows"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)

Expand All @@ -41,7 +42,7 @@ type ReportValues struct {
}

// CreateManager creates telemetry manager using the provided rest.Config.
func CreateManager(restConfig *rest.Config, fixedPayload Payload, rv ReportValues) (telemetry.Manager, error) {
func CreateManager(restConfig *rest.Config, gatewaysCounter workflows.DiscoveredGatewaysCounter, fixedPayload Payload, rv ReportValues) (telemetry.Manager, error) {
logger := logrusr.New(logrus.New())

k, err := kubernetes.NewForConfig(restConfig)
Expand All @@ -54,7 +55,7 @@ func CreateManager(restConfig *rest.Config, fixedPayload Payload, rv ReportValue
}
dyn := dynamic.New(k.Discovery().RESTClient())

m, err := createManager(k, dyn, cl, fixedPayload, rv,
m, err := createManager(k, dyn, cl, gatewaysCounter, fixedPayload, rv,
telemetry.OptManagerPeriod(telemetryPeriod),
telemetry.OptManagerLogger(logger),
)
Expand All @@ -79,6 +80,7 @@ func createManager(
k kubernetes.Interface,
dyn dynamic.Interface,
cl client.Client,
gatewaysCounter workflows.DiscoveredGatewaysCounter,
fixedPayload Payload,
rv ReportValues,
opts ...telemetry.OptManager,
Expand Down Expand Up @@ -164,6 +166,14 @@ func createManager(
m.AddWorkflow(w)
}

if rv.GatewayServiceDiscoveryEnabled {
w, err := workflows.NewGatewayDiscoveryWorkflow(gatewaysCounter)
if err != nil {
return nil, fmt.Errorf("failed to create gateway discovery workflow: %w", err)
}
m.AddWorkflow(w)
}

return m, nil
}

Expand Down
168 changes: 134 additions & 34 deletions internal/manager/telemetry/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

type mockGatewaysCounter int

func (m mockGatewaysCounter) GatewayClientsCount() int {
return int(m)
}

func TestCreateManager(t *testing.T) {
var (
payload = types.ProviderReport{
Expand All @@ -37,7 +43,6 @@ func TestCreateManager(t *testing.T) {
"gateway": true,
"knative": false,
}
ctx = context.Background()
publishService = apitypes.NamespacedName{
Namespace: "kong",
Name: "kong-proxy",
Expand Down Expand Up @@ -82,18 +87,138 @@ func TestCreateManager(t *testing.T) {
Platform: "linux/amd64",
}

mgr, err := createManager(
reportValues := ReportValues{
FeatureGates: featureGates,
MeshDetection: true,
PublishServiceNN: publishService,
KonnectSyncEnabled: true,
GatewayServiceDiscoveryEnabled: true,
}

runManagerTest(
t,
k8sclient,
dyn,
ctrlClient,
mockGatewaysCounter(5),
payload,
ReportValues{
FeatureGates: featureGates,
MeshDetection: true,
PublishServiceNN: publishService,
KonnectSyncEnabled: true,
GatewayServiceDiscoveryEnabled: true,
reportValues,
func(t *testing.T, actualReport string) {
require.Equal(t,
fmt.Sprintf(
"<14>"+
"signal=test-signal;"+
"db=off;"+
"feature-gateway-service-discovery=true;"+
"feature-gateway=true;"+
"feature-knative=false;"+
"feature-konnect-sync=true;"+
"hn=%s;"+
"kv=3.1.1;"+
"uptime=0;"+
"discovered_gateways_count=5;"+
"k8s_arch=linux/amd64;"+
"k8s_provider=UNKNOWN;"+
"k8sv=v1.24.5;"+
"k8sv_semver=v1.24.5;"+
"k8s_nodes_count=4;"+
"k8s_pods_count=8;"+
"k8s_services_count=17;"+
"kinm=c3,l2,l3,l4;"+
"mdep=i3,k3,km3,l3,t3;"+
"mdist=all17,c1,i2,k1,km1,l2,t1;"+
"\n",
hostname),
actualReport,
)
},
)
}

func TestCreateManager_GatewayDiscoverySpecifics(t *testing.T) {
testCases := []struct {
name string
gatewayServiceDiscoveryEnabled bool
expectReportToContain []string
expectReportToNotContain []string
}{
{
name: "gateway service discovery disabled",
gatewayServiceDiscoveryEnabled: false,
expectReportToContain: []string{
"feature-gateway-service-discovery=false",
},
expectReportToNotContain: []string{
"discovered_gateways_count=",
},
},
{
name: "gateway service discovery enabled",
gatewayServiceDiscoveryEnabled: true,
expectReportToContain: []string{
"feature-gateway-service-discovery=true",
"discovered_gateways_count=5",
},
},
}

scheme := prepareScheme(t)
dyn := testdynclient.NewSimpleDynamicClient(scheme)
ctrlClient := fakeclient.NewClientBuilder().Build()
k8sclient := testk8sclient.NewSimpleClientset()

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

runManagerTest(
t,
k8sclient,
dyn,
ctrlClient,
mockGatewaysCounter(5),
Payload{},
ReportValues{
GatewayServiceDiscoveryEnabled: tc.gatewayServiceDiscoveryEnabled,
},
func(t *testing.T, actualReport string) {
for _, expected := range tc.expectReportToContain {
require.Contains(t, actualReport, expected)
}
for _, expected := range tc.expectReportToNotContain {
require.NotContains(t, actualReport, expected)
}
})
})
}
}

// runManagerTest is a helper function that creates a manager with the dependencies provided as arguments, and
// calls the testFn with the actual report string it receives after triggering `test-signal` execution.
func runManagerTest(
t *testing.T,

// Following arguments map with the arguments of the createManager function, but instead of interfaces,
// concrete test types are used.
k8sclient *testk8sclient.Clientset,
dyn *testdynclient.FakeDynamicClient,
ctrlClient client.Client,
gatewaysCounter mockGatewaysCounter,
payload Payload,
reportValues ReportValues,

// testFn is a function that will be called with the actual report string.
testFn func(t *testing.T, actualReport string),
) {
ctx := context.Background()
mgr, err := createManager(
k8sclient,
dyn,
ctrlClient,
gatewaysCounter,
payload,
reportValues,
telemetry.OptManagerPeriod(time.Hour),
telemetry.OptManagerLogger(logr.Discard()),
)
Expand All @@ -112,32 +237,7 @@ func TestCreateManager(t *testing.T) {
require.NoError(t, mgr.TriggerExecute(ctx, "test-signal"))
select {
case b := <-ch:
require.Equal(t,
fmt.Sprintf(
"<14>"+
"signal=test-signal;"+
"db=off;"+
"feature-gateway-service-discovery=true;"+
"feature-gateway=true;"+
"feature-knative=false;"+
"feature-konnect-sync=true;"+
"hn=%s;"+
"kv=3.1.1;"+
"uptime=0;"+
"k8s_arch=linux/amd64;"+
"k8s_provider=UNKNOWN;"+
"k8sv=v1.24.5;"+
"k8sv_semver=v1.24.5;"+
"k8s_nodes_count=4;"+
"k8s_pods_count=8;"+
"k8s_services_count=17;"+
"kinm=c3,l2,l3,l4;"+
"mdep=i3,k3,km3,l3,t3;"+
"mdist=all17,c1,i2,k1,km1,l2,t1;"+
"\n",
hostname),
string(b),
)
testFn(t, string(b))
case <-time.After(time.Second):
t.Fatal("we should get a report but we didn't")
}
Expand Down
12 changes: 9 additions & 3 deletions internal/manager/telemetry/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ import (
"github.com/google/uuid"
"k8s.io/client-go/rest"

"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi"
"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/metadata"
)

// GatewayClientsProvider is an interface that provides clients for the currently discovered Gateway instances.
type GatewayClientsProvider interface {
GatewayClients() []*adminapi.Client
GatewayClientsCount() int
}

// SetupAnonymousReports sets up and starts the anonymous reporting and returns
// a cleanup function and an error.
// The caller is responsible to call the returned function - when the returned
// error is not nil - to stop the reports sending.
func SetupAnonymousReports(
ctx context.Context,
kubeCfg *rest.Config,
clientsProvider dataplane.AdminAPIClientsProvider,
clientsProvider GatewayClientsProvider,
rv ReportValues,
) (func(), error) {
// if anonymous reports are enabled this helps provide Kong with insights about usage of the ingress controller
Expand Down Expand Up @@ -61,7 +67,7 @@ func SetupAnonymousReports(
"id": uuid.NewString(), // universal unique identifier for this system
}

tMgr, err := CreateManager(kubeCfg, fixedPayload, rv)
tMgr, err := CreateManager(kubeCfg, clientsProvider, fixedPayload, rv)
if err != nil {
return nil, fmt.Errorf("failed to create anonymous reports manager: %w", err)
}
Expand Down
63 changes: 63 additions & 0 deletions internal/manager/telemetry/workflows/gateway_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package workflows

import (
"context"
"errors"
"fmt"

"github.com/kong/kubernetes-telemetry/pkg/provider"
"github.com/kong/kubernetes-telemetry/pkg/telemetry"
"github.com/kong/kubernetes-telemetry/pkg/types"
)

const GatewayDiscoveryWorkflowName = "gateway_discovery"

// DiscoveredGatewaysCounter is an interface that allows to count currently discovered Gateways.
type DiscoveredGatewaysCounter interface {
GatewayClientsCount() int
}

func NewGatewayDiscoveryWorkflow(gatewaysCounter DiscoveredGatewaysCounter) (telemetry.Workflow, error) {
w := telemetry.NewWorkflow(GatewayDiscoveryWorkflowName)

discoveredGatewaysCountProvider, err := NewDiscoveredGatewaysCountProvider(gatewaysCounter)
if err != nil {
return nil, fmt.Errorf("failed to create discovered gateways count provider: %w", err)
}
w.AddProvider(discoveredGatewaysCountProvider)

return w, nil
}

// DiscoveredGatewaysCountProvider is a provider that reports the number of currently discovered Gateways.
type DiscoveredGatewaysCountProvider struct {
counter DiscoveredGatewaysCounter
}

func NewDiscoveredGatewaysCountProvider(counter DiscoveredGatewaysCounter) (*DiscoveredGatewaysCountProvider, error) {
if counter == nil {
return nil, errors.New("discovered gateways counter is required")
}

return &DiscoveredGatewaysCountProvider{counter: counter}, nil
}

const (
DiscoveredGatewaysCountProviderName = "discovered_gateways_count"
DiscoveredGatewaysCountProviderKind = provider.Kind(DiscoveredGatewaysCountProviderName)
DiscoveredGatewaysCountKey = types.ProviderReportKey(DiscoveredGatewaysCountProviderName)
)

func (d *DiscoveredGatewaysCountProvider) Name() string {
return DiscoveredGatewaysCountProviderName
}

func (d *DiscoveredGatewaysCountProvider) Kind() provider.Kind {
return DiscoveredGatewaysCountProviderKind
}

func (d *DiscoveredGatewaysCountProvider) Provide(context.Context) (types.ProviderReport, error) {
return types.ProviderReport{
DiscoveredGatewaysCountKey: d.counter.GatewayClientsCount(),
}, nil
}