Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
GGabriele committed Feb 8, 2024
1 parent cb810f5 commit 3b35be2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
45 changes: 36 additions & 9 deletions pkg/file/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (b *stateBuilder) consumerGroups() {

for _, consumer := range cg.Consumers {
if consumer != nil {
c, err := b.ingestConsumerGroupConsumer(&FConsumer{
c, err := b.ingestConsumerGroupConsumer(cg.ID, &FConsumer{
Consumer: *consumer,
})
if err != nil {
Expand Down Expand Up @@ -308,7 +308,7 @@ func (b *stateBuilder) caCertificates() {
}
}

func (b *stateBuilder) ingestConsumerGroupConsumer(c *FConsumer) (*kong.Consumer, error) {
func (b *stateBuilder) ingestConsumerGroupConsumer(cgID *string, c *FConsumer) (*kong.Consumer, error) {
var (
consumer *state.Consumer
err error
Expand Down Expand Up @@ -364,6 +364,15 @@ func (b *stateBuilder) ingestConsumerGroupConsumer(c *FConsumer) (*kong.Consumer
if err != nil {
return nil, err
}
err = b.intermediate.ConsumerGroupConsumers.Add(state.ConsumerGroupConsumer{
ConsumerGroupConsumer: kong.ConsumerGroupConsumer{
ConsumerGroup: &kong.ConsumerGroup{ID: cgID},
Consumer: &c.Consumer,
},
})
if err != nil {
return nil, err
}
return &c.Consumer, nil
}

Expand Down Expand Up @@ -417,17 +426,35 @@ func (b *stateBuilder) consumers() {
if consumer != nil {
c.Consumer.CreatedAt = consumer.CreatedAt
}
b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})

// check if consumer was already added in the consumer groups section.
// if it was, we don't want to add it again.
consumerAlreadyAdded := false
consumerGroupConsumers, err := b.intermediate.ConsumerGroupConsumers.GetAll()
if err != nil {
b.err = err
return
}

// ingest consumer into consumer group
if err := b.ingestIntoConsumerGroup(c); err != nil {
b.err = err
return
for _, cgc := range consumerGroupConsumers {
if cgc.Consumer != nil && (c.ID != nil && *cgc.Consumer.ID == *c.ID ||
c.Username != nil && cgc.Consumer.Username != nil && *cgc.Consumer.Username == *c.Username ||
c.CustomID != nil && cgc.Consumer.CustomID != nil && *cgc.Consumer.CustomID == *c.CustomID) {
consumerAlreadyAdded = true
break
}
}
if !consumerAlreadyAdded {
b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})
if err != nil {
b.err = err
return
}
// ingest consumer into consumer group
if err := b.ingestIntoConsumerGroup(c); err != nil {
b.err = err
return
}
}

// plugins for the Consumer
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5137,7 +5137,7 @@ func Test_Sync_DeDupPluginsScopedToConsumerGroups(t *testing.T) {
// - konnect
func Test_Sync_ConsumerGroupConsumerFromUpstream(t *testing.T) {
t.Setenv("DECK_KONNECT_CONTROL_PLANE_NAME", "default")
runWhenEnterpriseOrKonnect(t, ">=3.5.0")
runWhenEnterpriseOrKonnect(t, ">=3.0.0")
setup(t)

client, err := getTestClient()
Expand Down

0 comments on commit 3b35be2

Please sign in to comment.