Skip to content

Commit

Permalink
feat: Consumer Group can be configured (#4387)
Browse files Browse the repository at this point in the history
  • Loading branch information
programmer04 committed Jul 27, 2023
1 parent 4b4fcab commit 0aef048
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Adding a new version? You'll need three changes:
### Added

- **WIP** Introduce `KongConsumerGroup` CRD (supported by Kong Enterprise only)
[#4325](https://github.com/Kong/kubernetes-ingress-controller/pull/4325)
[#4325](https://github.com/Kong/kubernetes-ingress-controller/pull/4325), [#4387](https://github.com/Kong/kubernetes-ingress-controller/pull/4387)
- The ResponseHeaderModifier Gateway API filter is now supported and translated
to the proper set of Kong plugins.
[#4350](https://github.com/Kong/kubernetes-ingress-controller/pull/4350)
Expand Down
8 changes: 8 additions & 0 deletions internal/dataplane/deckgen/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ func ToDeckContent(
PluginString(content.Plugins[j])) > 0
})

for _, cg := range k8sState.ConsumerGroups {
consumerGroup := file.FConsumerGroupObject{ConsumerGroup: cg.ConsumerGroup}
content.ConsumerGroups = append(content.ConsumerGroups, consumerGroup)
}
sort.SliceStable(content.ConsumerGroups, func(i, j int) bool {
return strings.Compare(*content.ConsumerGroups[i].Name, *content.ConsumerGroups[j].Name) > 0
})

for _, u := range k8sState.Upstreams {
fillUpstream(&u.Upstream)
upstream := file.FUpstream{Upstream: u.Upstream}
Expand Down
8 changes: 8 additions & 0 deletions internal/dataplane/kongstate/consumergroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package kongstate

import "github.com/kong/go-kong/kong"

// ConsumerGroup holds a Kong Consumer.
type ConsumerGroup struct {
kong.ConsumerGroup
}
20 changes: 20 additions & 0 deletions internal/dataplane/kongstate/kongstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type KongState struct {
Licenses []kong.License
Plugins []Plugin
Consumers []Consumer
ConsumerGroups []ConsumerGroup
}

// SanitizedCopy returns a shallow copy with sensitive values redacted best-effort.
Expand Down Expand Up @@ -159,6 +160,17 @@ func (ks *KongState) FillConsumersAndCredentials(
}
}

func (ks *KongState) FillConsumerGroups(_ logrus.FieldLogger, s store.Storer) {
for _, cg := range s.ListKongConsumerGroups() {
ks.ConsumerGroups = append(ks.ConsumerGroups, ConsumerGroup{
ConsumerGroup: kong.ConsumerGroup{
Name: kong.String(cg.Name),
Tags: util.GenerateTagsForObject(cg),
},
})
}
}

func (ks *KongState) FillOverrides(log logrus.FieldLogger, s store.Storer) {
for i := 0; i < len(ks.Services); i++ {
// Services
Expand Down Expand Up @@ -396,4 +408,12 @@ func (ks *KongState) FillIDs(logger logrus.FieldLogger) {
ks.Consumers[consumerIndex] = consumer
}
}

for consumerGroupIndex, consumerGroup := range ks.ConsumerGroups {
if err := consumerGroup.FillID(); err != nil {
logger.WithError(err).Errorf("failed to fill ID for consumer group %s", *consumerGroup.Name)
} else {
ks.ConsumerGroups[consumerGroupIndex] = consumerGroup
}
}
}
3 changes: 3 additions & 0 deletions internal/dataplane/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ func (p *Parser) BuildKongConfig() KongConfigBuildingResult {
p.registerSuccessfullyParsedObject(pl.K8sParent)
}

// process consumer groups
result.FillConsumerGroups(p.logger, p.storer)

// generate Certificates and SNIs
ingressCerts := p.getCerts(ingressRules.SecretNameToSNIs)
gatewayCerts := p.getGatewayCerts()
Expand Down
11 changes: 11 additions & 0 deletions internal/store/fake_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type FakeObjects struct {
KongClusterPlugins []*configurationv1.KongClusterPlugin
KongIngresses []*configurationv1.KongIngress
KongConsumers []*configurationv1.KongConsumer
KongConsumerGroups []*configurationv1beta1.KongConsumerGroup

KnativeIngresses []*knative.Ingress
}
Expand Down Expand Up @@ -179,6 +180,13 @@ func NewFakeStore(
return nil, err
}
}
consumerGroupStore := cache.NewStore(keyFunc)
for _, c := range objects.KongConsumerGroups {
err := consumerGroupStore.Add(c)
if err != nil {
return nil, err
}
}
kongPluginsStore := cache.NewStore(keyFunc)
for _, p := range objects.KongPlugins {
err := kongPluginsStore.Add(p)
Expand Down Expand Up @@ -220,6 +228,7 @@ func NewFakeStore(
Plugin: kongPluginsStore,
ClusterPlugin: kongClusterPluginsStore,
Consumer: consumerStore,
ConsumerGroup: consumerGroupStore,
KongIngress: kongIngressStore,
IngressClassParametersV1alpha1: IngressClassParametersV1alpha1Store,
KnativeIngress: knativeIngressStore,
Expand Down Expand Up @@ -258,6 +267,7 @@ func (objects FakeObjects) MarshalToYAML() ([]byte, error) {
reflect.TypeOf(&configurationv1.KongClusterPlugin{}): configurationv1.SchemeGroupVersion.WithKind("KongClusterPlugin"),
reflect.TypeOf(&configurationv1.KongIngress{}): configurationv1.SchemeGroupVersion.WithKind("KongIngress"),
reflect.TypeOf(&configurationv1.KongConsumer{}): configurationv1.SchemeGroupVersion.WithKind("KongConsumer"),
reflect.TypeOf(&configurationv1beta1.KongConsumerGroup{}): configurationv1beta1.SchemeGroupVersion.WithKind("KongConsumerGroup"),
}

out := &bytes.Buffer{}
Expand Down Expand Up @@ -299,6 +309,7 @@ func (objects FakeObjects) MarshalToYAML() ([]byte, error) {
allObjects = append(allObjects, lo.ToAnySlice(objects.KongClusterPlugins)...)
allObjects = append(allObjects, lo.ToAnySlice(objects.KongIngresses)...)
allObjects = append(allObjects, lo.ToAnySlice(objects.KongConsumers)...)
allObjects = append(allObjects, lo.ToAnySlice(objects.KongConsumerGroups)...)

for _, obj := range allObjects {
if err := fillGVKAndAppendToBuffer(obj.(runtime.Object)); err != nil {
Expand Down
29 changes: 29 additions & 0 deletions internal/store/fake_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,35 @@ func TestFakeStoreConsumer(t *testing.T) {
assert.True(errors.As(err, &ErrNotFound{}))
}

func TestFakeStoreConsumerGroup(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

consumerGroups := []*configurationv1beta1.KongConsumerGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
Annotations: map[string]string{
annotations.IngressClassKey: annotations.DefaultIngressClass,
},
},
},
}
store, err := NewFakeStore(FakeObjects{KongConsumerGroups: consumerGroups})
require.Nil(err)
require.NotNil(store)
assert.Len(store.ListKongConsumerGroups(), 1)
c, err := store.GetKongConsumerGroup("default", "foo")
assert.Nil(err)
assert.NotNil(c)

c, err = store.GetKongConsumerGroup("default", "does-not-exist")
assert.Nil(c)
assert.NotNil(err)
assert.True(errors.As(err, &ErrNotFound{}))
}

func TestFakeStorePlugins(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down
39 changes: 39 additions & 0 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Storer interface {
GetKongPlugin(namespace, name string) (*kongv1.KongPlugin, error)
GetKongClusterPlugin(name string) (*kongv1.KongClusterPlugin, error)
GetKongConsumer(namespace, name string) (*kongv1.KongConsumer, error)
GetKongConsumerGroup(namespace, name string) (*kongv1beta1.KongConsumerGroup, error)
GetIngressClassName() string
GetIngressClassV1(name string) (*netv1.IngressClass, error)
GetIngressClassParametersV1Alpha1(ingressClass *netv1.IngressClass) (*kongv1alpha1.IngressClassParameters, error)
Expand All @@ -99,6 +100,7 @@ type Storer interface {
ListKongPlugins() []*kongv1.KongPlugin
ListKongClusterPlugins() []*kongv1.KongClusterPlugin
ListKongConsumers() []*kongv1.KongConsumer
ListKongConsumerGroups() []*kongv1beta1.KongConsumerGroup
ListCACerts() ([]*corev1.Secret, error)
}

Expand Down Expand Up @@ -143,6 +145,7 @@ type CacheStores struct {
Plugin cache.Store
ClusterPlugin cache.Store
Consumer cache.Store
ConsumerGroup cache.Store
KongIngress cache.Store
TCPIngress cache.Store
UDPIngress cache.Store
Expand Down Expand Up @@ -175,6 +178,7 @@ func NewCacheStores() CacheStores {
Plugin: cache.NewStore(keyFunc),
ClusterPlugin: cache.NewStore(clusterResourceKeyFunc),
Consumer: cache.NewStore(keyFunc),
ConsumerGroup: cache.NewStore(keyFunc),
KongIngress: cache.NewStore(keyFunc),
TCPIngress: cache.NewStore(keyFunc),
UDPIngress: cache.NewStore(keyFunc),
Expand Down Expand Up @@ -273,6 +277,8 @@ func (c CacheStores) Get(obj runtime.Object) (item interface{}, exists bool, err
return c.ClusterPlugin.Get(obj)
case *kongv1.KongConsumer:
return c.Consumer.Get(obj)
case *kongv1beta1.KongConsumerGroup:
return c.ConsumerGroup.Get(obj)
case *kongv1.KongIngress:
return c.KongIngress.Get(obj)
case *kongv1beta1.TCPIngress:
Expand Down Expand Up @@ -336,6 +342,8 @@ func (c CacheStores) Add(obj runtime.Object) error {
return c.ClusterPlugin.Add(obj)
case *kongv1.KongConsumer:
return c.Consumer.Add(obj)
case *kongv1beta1.KongConsumerGroup:
return c.ConsumerGroup.Add(obj)
case *kongv1.KongIngress:
return c.KongIngress.Add(obj)
case *kongv1beta1.TCPIngress:
Expand Down Expand Up @@ -400,6 +408,8 @@ func (c CacheStores) Delete(obj runtime.Object) error {
return c.ClusterPlugin.Delete(obj)
case *kongv1.KongConsumer:
return c.Consumer.Delete(obj)
case *kongv1beta1.KongConsumerGroup:
return c.ConsumerGroup.Delete(obj)
case *kongv1.KongIngress:
return c.KongIngress.Delete(obj)
case *kongv1beta1.TCPIngress:
Expand Down Expand Up @@ -804,6 +814,19 @@ func (s Store) GetKongConsumer(namespace, name string) (*kongv1.KongConsumer, er
return p.(*kongv1.KongConsumer), nil
}

// GetKongConsumerGroup returns the 'name' KongConsumerGroup resource in namespace.
func (s Store) GetKongConsumerGroup(namespace, name string) (*kongv1beta1.KongConsumerGroup, error) {
key := fmt.Sprintf("%v/%v", namespace, name)
p, exists, err := s.stores.ConsumerGroup.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNotFound{fmt.Sprintf("KongConsumerGroup %v not found", key)}
}
return p.(*kongv1beta1.KongConsumerGroup), nil
}

func (s Store) GetIngressClassName() string {
return s.ingressClass
}
Expand Down Expand Up @@ -890,6 +913,20 @@ func (s Store) ListKongConsumers() []*kongv1.KongConsumer {
return consumers
}

// ListKongConsumerGroups returns all KongConsumerGroups filtered by the ingress.class
// annotation.
func (s Store) ListKongConsumerGroups() []*kongv1beta1.KongConsumerGroup {
var consumerGroups []*kongv1beta1.KongConsumerGroup
for _, item := range s.stores.ConsumerGroup.List() {
c, ok := item.(*kongv1beta1.KongConsumerGroup)
if ok && s.isValidIngressClass(&c.ObjectMeta, annotations.IngressClassKey, s.getIngressClassHandling()) {
consumerGroups = append(consumerGroups, c)
}
}

return consumerGroups
}

// ListGlobalKongPlugins returns all KongPlugin resources
// filtered by the ingress.class annotation and with the
// label global:"true".
Expand Down Expand Up @@ -1066,6 +1103,8 @@ func mkObjFromGVK(gvk schema.GroupVersionKind) (runtime.Object, error) {
return &kongv1.KongClusterPlugin{}, nil
case kongv1.SchemeGroupVersion.WithKind("KongConsumer"):
return &kongv1.KongConsumer{}, nil
case kongv1beta1.SchemeGroupVersion.WithKind("KongConsumerGroup"):
return &kongv1beta1.KongConsumerGroup{}, nil
case kongv1alpha1.SchemeGroupVersion.WithKind("IngressClassParameters"):
return &kongv1alpha1.IngressClassParameters{}, nil
// ----------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions internal/versions/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ var (
// PluginOrderingVersionCutoff is the Kong version prior to the addition of plugin ordering.
PluginOrderingVersionCutoff = semver.Version{Major: 3}

// ConsumerGroupsVersionCutoff is the Kong version prior to the addition of Consumer Groups as first class citizens.
// TODO: Change to 3.4 once it's released. For now let's assume it's 3.3 (some features work), but for the full
// support it should be 3.4 (unreleased yet).
ConsumerGroupsVersionCutoff = semver.Version{Major: 3, Minor: 3}

// MTLSCredentialVersionCutoff is the minimum Kong version that support mTLS credentials. This is a patch version
// because the original version of the mTLS credential was not compatible with KIC.
MTLSCredentialVersionCutoff = semver.Version{Major: 2, Minor: 3, Patch: 2}
Expand Down
89 changes: 89 additions & 0 deletions test/integration/consumer_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build integration_tests

package integration

import (
"context"
"fmt"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kong/kubernetes-ingress-controller/v2/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v2/internal/versions"
kongv1beta1 "github.com/kong/kubernetes-ingress-controller/v2/pkg/apis/configuration/v1beta1"
"github.com/kong/kubernetes-ingress-controller/v2/pkg/clientset"
"github.com/kong/kubernetes-ingress-controller/v2/test/consts"
"github.com/kong/kubernetes-ingress-controller/v2/test/internal/helpers"
"github.com/kong/kubernetes-ingress-controller/v2/test/internal/testenv"
)

func TestConsumerGroup(t *testing.T) {
t.Parallel()

RunWhenKongVersion(t, fmt.Sprintf(">=%s", versions.ConsumerGroupsVersionCutoff))
RunWhenKongEnterprise(t)

ctx := context.Background()
ns, cleaner := helpers.Setup(ctx, t, env)

c, err := clientset.NewForConfig(env.Cluster().Config())
require.NoError(t, err)

const consumerGroupName = "test-consumer-group"
t.Logf("configuring Consumer Group: %q", consumerGroupName)
cg, err := c.ConfigurationV1beta1().KongConsumerGroups(ns.Name).Create(
ctx,
&kongv1beta1.KongConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Name: consumerGroupName,
Namespace: ns.Name,
Annotations: map[string]string{
annotations.IngressClassKey: consts.IngressClass,
},
},
},
metav1.CreateOptions{},
)
require.NoError(t, err)
cleaner.Add(cg)

t.Logf("validating that Consumer Group: %q was successfully configured", consumerGroupName)
require.Eventually(t, func() bool {
cgPath := fmt.Sprintf("/consumer_groups/%s", consumerGroupName)
var headers map[string]string
if testenv.DBMode() != testenv.DBModeOff {
cgPath = fmt.Sprintf("/%s/consumer_groups/%s", consts.KongTestWorkspace, consumerGroupName)
headers = map[string]string{
"Kong-Admin-Token": consts.KongTestPassword,
}
}
req := helpers.MustHTTPRequest(t, http.MethodGet, proxyAdminURL, cgPath, headers)
resp, err := helpers.DefaultHTTPClient().Do(req)
if err != nil {
t.Logf("WARNING: error while waiting for %s: %v", resp.Request.URL, err)
return false
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Logf("WARNING: error while reading response from %s: %v", resp.Request.URL, err)
}
switch resp.StatusCode {
case http.StatusOK:
return true
case http.StatusForbidden:
t.Logf(
"WARNING: it seems Kong Gateway Enterprise hasn't got a valid license passed - from: %s received: %s with body: %s",
resp.Request.URL, resp.Status, body,
)
return false
default:
t.Logf("WARNING: from: %s received unexpected: %s with body: %s", resp.Request.URL, resp.Status, body)
return false
}
}, ingressWait, waitTick)
}

0 comments on commit 0aef048

Please sign in to comment.