Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reference upstream consumer-groups relationships #57

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 106 additions & 7 deletions pkg/file/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ func (b *stateBuilder) consumerGroups() {
if current != nil {
cgo.ConsumerGroup.CreatedAt = current.CreatedAt
}

for _, consumer := range cg.Consumers {
if consumer != nil {
c, err := b.ingestConsumerGroupConsumer(cg.ID, &FConsumer{
Consumer: *consumer,
})
if err != nil {
b.err = err
return
}
cgo.Consumers = append(cgo.Consumers, c)
}
}
b.rawState.ConsumerGroups = append(b.rawState.ConsumerGroups, &cgo)
}
}
Expand Down Expand Up @@ -295,6 +308,74 @@ func (b *stateBuilder) caCertificates() {
}
}

func (b *stateBuilder) ingestConsumerGroupConsumer(cgID *string, c *FConsumer) (*kong.Consumer, error) {
var (
consumer *state.Consumer
err error
)

// if the consumer is already present in the target state because it is pulled from
// upstream via the lookup tags, we don't want to create a new consumer.
for _, tc := range b.targetContent.Consumers {
stringTCTags := make([]string, len(tc.Tags))
for i, tag := range tc.Tags {
if tag != nil {
stringTCTags[i] = *tag
}
}
sort.Strings(stringTCTags)
if reflect.DeepEqual(stringTCTags, b.lookupTagsConsumers) && !utils.Empty(tc.ID) {
if (tc.Username != nil && c.Username != nil && *tc.Username == *c.Username) ||
(tc.CustomID != nil && c.CustomID != nil && *tc.CustomID == *c.CustomID) {
return &kong.Consumer{
ID: tc.ID,
Username: tc.Username,
CustomID: tc.CustomID,
Tags: tc.Tags,
}, nil
}
}
}

if c.Username != nil {
consumer, err = b.currentState.Consumers.GetByIDOrUsername(*c.Username)
}
if errors.Is(err, state.ErrNotFound) || consumer == nil {
if c.CustomID != nil {
consumer, err = b.currentState.Consumers.GetByCustomID(*c.CustomID)
}
}
if utils.Empty(c.ID) {
if errors.Is(err, state.ErrNotFound) {
c.ID = uuid()
} else if err != nil {
return nil, err
} else {
c.ID = kong.String(*consumer.ID)
}
}
utils.MustMergeTags(&c.Consumer, b.selectTags)
if consumer != nil {
c.Consumer.CreatedAt = consumer.CreatedAt
}

b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})
if err != nil {
return nil, err
}
err = b.intermediate.ConsumerGroupConsumers.Add(state.ConsumerGroupConsumer{
ConsumerGroupConsumer: kong.ConsumerGroupConsumer{
ConsumerGroup: &kong.ConsumerGroup{ID: cgID},
Consumer: &c.Consumer,
},
})
if err != nil {
return nil, err
}
return &c.Consumer, nil
}

func (b *stateBuilder) consumers() {
if b.err != nil {
return
Expand Down Expand Up @@ -345,17 +426,35 @@ func (b *stateBuilder) consumers() {
if consumer != nil {
c.Consumer.CreatedAt = consumer.CreatedAt
}
b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})

// check if consumer was already added in the consumer groups section.
// if it was, we don't want to add it again.
consumerAlreadyAdded := false
consumerGroupConsumers, err := b.intermediate.ConsumerGroupConsumers.GetAll()
if err != nil {
b.err = err
return
}

// ingest consumer into consumer group
if err := b.ingestIntoConsumerGroup(c); err != nil {
b.err = err
return
for _, cgc := range consumerGroupConsumers {
if cgc.Consumer != nil && (c.Username != nil && cgc.Consumer.Username != nil && *cgc.Consumer.Username == *c.Username ||
c.CustomID != nil && cgc.Consumer.CustomID != nil && *cgc.Consumer.CustomID == *c.CustomID) {
c.ID = cgc.Consumer.ID
consumerAlreadyAdded = true
break
}
}
if !consumerAlreadyAdded {
b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})
if err != nil {
b.err = err
return
}
// ingest consumer into consumer group
if err := b.ingestIntoConsumerGroup(c); err != nil {
b.err = err
return
}
}

// plugins for the Consumer
Expand Down
52 changes: 52 additions & 0 deletions tests/integration/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5131,3 +5131,55 @@ func Test_Sync_DeDupPluginsScopedToConsumerGroups(t *testing.T) {
require.NoError(t, sync("testdata/sync/030-plugin-dedup-consumer-groups/kong.yaml"))
testKongState(t, client, false, expectedState, nil)
}

// test scope:
// - 3.5.0+
// - konnect
func Test_Sync_ConsumerGroupConsumerFromUpstream(t *testing.T) {
t.Setenv("DECK_KONNECT_CONTROL_PLANE_NAME", "default")
runWhenEnterpriseOrKonnect(t, ">=3.4.0")
setup(t)

client, err := getTestClient()
if err != nil {
t.Fatalf(err.Error())
}

expectedState := utils.KongRawState{
ConsumerGroups: []*kong.ConsumerGroupObject{
{
ConsumerGroup: &kong.ConsumerGroup{
ID: kong.String("c0f6c818-470c-4df7-8515-c8e904765fcc"),
Name: kong.String("group-1"),
Tags: kong.StringSlice("project:the-project", "managed-by:deck"),
},
Consumers: []*kong.Consumer{
{
ID: kong.String("97cab250-1b0a-4119-aa2e-0756e8931034"),
Username: kong.String("consumer-1"),
Tags: kong.StringSlice("project:the-project", "managed-by:the-background-process"),
},
},
},
},
Consumers: []*kong.Consumer{
{
ID: kong.String("97cab250-1b0a-4119-aa2e-0756e8931034"),
Username: kong.String("consumer-1"),
Tags: kong.StringSlice("project:the-project", "managed-by:the-background-process"),
},
},
}

// simulate the following scenario:
// - a consumer-group defined with a set of tags, ideally managed by decK
// - a consumer defined with another set of tags, ideally managed by an external process
// - the consumer -> consumer-group relationship, ideally managed by an external process
require.NoError(t, sync("testdata/sync/031-consumer-group-consumers-from-upstream/initial.yaml"))
testKongState(t, client, false, expectedState, nil)

// referencing the relationship in a file without the consumer would still work
// if default_lookup_tags are defined to pull consumers from upstream.
require.NoError(t, sync("testdata/sync/031-consumer-group-consumers-from-upstream/consumer-groups.yaml"))
testKongState(t, client, false, expectedState, nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ consumer_groups:
name: silver
consumers:
- username: bar
- username: baz
plugins:
- name: rate-limiting-advanced
config:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
_format_version: "3.0"
_info:
defaults: {}
select_tags:
- project:the-project
- managed-by:deck
default_lookup_tags:
consumers:
- managed-by:the-background-process
- project:the-project
consumer_groups:
- name: group-1
consumers:
- username: consumer-1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
_format_version: "3.0"
consumer_groups:
- id: c0f6c818-470c-4df7-8515-c8e904765fcc
name: group-1
tags:
- project:the-project
- managed-by:deck
consumers:
- id: 97cab250-1b0a-4119-aa2e-0756e8931034
username: consumer-1
groups:
- id: c0f6c818-470c-4df7-8515-c8e904765fcc
name: group-1
tags:
- project:the-project
- managed-by:the-background-process
Loading