Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 2 additions & 22 deletions internal/pkg/cmd/prerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,32 +463,12 @@ func getKafkaRestEndpoint(ctx *dynamicconfig.DynamicContext) (string, string, er
return "", "", nil
}

clusterConfig, err := ctx.GetKafkaClusterForCommand()
config, err := ctx.GetKafkaClusterForCommand()
if err != nil {
return "", "", err
}
if clusterConfig.RestEndpoint != "" {
return clusterConfig.RestEndpoint, clusterConfig.ID, nil
}

// if clusterConfig.RestEndpoint is empty, fetch the cluster to ensure config isn't just out of date
// potentially remove this once Rest Proxy is enabled across prod
kafkaCluster, err := ctx.FetchCluster(clusterConfig.ID)
if err != nil {
return "", "", err
}

// no need to update the config if it's still empty
if kafkaCluster.RestEndpoint == "" {
return "", clusterConfig.ID, nil
}

// update config to have updated cluster if rest endpoint is no longer ""
clusterConfig = dynamicconfig.KafkaClusterToKafkaClusterConfig(kafkaCluster, clusterConfig.APIKeys)
ctx.KafkaClusterContext.AddKafkaClusterConfig(clusterConfig)
err = ctx.Save()

return kafkaCluster.RestEndpoint, clusterConfig.ID, err
return config.RestEndpoint, config.ID, err
}

// Converts a ccloud base URL to the appropriate Metrics URL.
Expand Down
5 changes: 2 additions & 3 deletions internal/pkg/cmd/prerunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"testing"

launchdarkly "github.com/confluentinc/cli/internal/pkg/featureflags"

flowv1 "github.com/confluentinc/cc-structs/kafka/flow/v1"
orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1"
"github.com/confluentinc/ccloud-sdk-go-v1"
Expand All @@ -23,6 +21,7 @@ import (
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
v1 "github.com/confluentinc/cli/internal/pkg/config/v1"
"github.com/confluentinc/cli/internal/pkg/errors"
"github.com/confluentinc/cli/internal/pkg/featureflags"
"github.com/confluentinc/cli/internal/pkg/form"
"github.com/confluentinc/cli/internal/pkg/log"
pmock "github.com/confluentinc/cli/internal/pkg/mock"
Expand Down Expand Up @@ -116,7 +115,7 @@ func getPreRunBase() *pcmd.PreRun {
}

func TestPreRun_Anonymous_SetLoggingLevel(t *testing.T) {
launchdarkly.Init(nil, true)
featureflags.Init(nil, true)

tests := map[string]log.Level{
"": log.ERROR,
Expand Down
34 changes: 27 additions & 7 deletions internal/pkg/config/v1/kafka_cluster_config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
package v1

import (
"strings"
"time"

schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1"
)

// KafkaClusterConfig represents a connection to a Kafka cluster.
type KafkaClusterConfig struct {
ID string `json:"id" hcl:"id"`
Name string `json:"name" hcl:"name"`
Bootstrap string `json:"bootstrap_servers" hcl:"bootstrap_servers"`
APIEndpoint string `json:"api_endpoint,omitempty" hcl:"api_endpoint"`
APIKeys map[string]*APIKeyPair `json:"api_keys" hcl:"api_keys"`
ID string `json:"id" hcl:"id"`
Name string `json:"name" hcl:"name"`
Bootstrap string `json:"bootstrap_servers" hcl:"bootstrap_servers"`
APIEndpoint string `json:"api_endpoint,omitempty" hcl:"api_endpoint"`
RestEndpoint string `json:"rest_endpoint,omitempty" hcl:"rest_endpoint"`
APIKeys map[string]*APIKeyPair `json:"api_keys" hcl:"api_keys"`
// APIKey is your active api key for this cluster and references a key in the APIKeys map
APIKey string `json:"api_key,omitempty" hcl:"api_key"`
RestEndpoint string `json:"rest_endpoint,omitempty" hcl:"rest_endpoint"`
APIKey string `json:"api_key,omitempty" hcl:"api_key"`
LastUpdate time.Time `json:"last_update,omitempty" hcl:"last_update"`
}

func NewKafkaClusterConfig(cluster *schedv1.KafkaCluster) *KafkaClusterConfig {
return &KafkaClusterConfig{
ID: cluster.Id,
Name: cluster.Name,
Bootstrap: strings.TrimPrefix(cluster.Endpoint, "SASL_SSL://"),
APIEndpoint: cluster.ApiEndpoint,
RestEndpoint: cluster.RestEndpoint,
APIKeys: make(map[string]*APIKeyPair),
LastUpdate: time.Now(),
}
}
4 changes: 2 additions & 2 deletions internal/pkg/config/v1/kafka_cluster_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (k *KafkaClusterContext) GetKafkaClusterConfig(clusterId string) *KafkaClus
if !k.EnvContext {
return k.KafkaClusterConfigs[clusterId]
}
kafkaEnvContext := k.GetCurrentKafkaEnvContext()
return kafkaEnvContext.KafkaClusterConfigs[clusterId]

return k.GetCurrentKafkaEnvContext().KafkaClusterConfigs[clusterId]
}

func (k *KafkaClusterContext) AddKafkaClusterConfig(kcc *KafkaClusterConfig) {
Expand Down
40 changes: 15 additions & 25 deletions internal/pkg/config/v1/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"fmt"
"time"

orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1"

Expand Down Expand Up @@ -163,32 +164,29 @@ func AuthenticatedConfigMock(params mockConfigParams) *Config {
}

func createUsernameCredential(credentialName string, auth *AuthConfig) *Credential {
credential := &Credential{
return &Credential{
Name: credentialName,
Username: auth.User.Email,
CredentialType: Username,
}
return credential
}

func createAPIKeyCredential(credentialName string, apiKeyPair *APIKeyPair) *Credential {
credential := &Credential{
return &Credential{
Name: credentialName,
APIKeyPair: apiKeyPair,
CredentialType: APIKey,
}
return credential
}
func createPlatform(name, server string) *Platform {
platform := &Platform{
return &Platform{
Name: name,
Server: server,
}
return platform
}

func createAuthConfig(userId int32, email string, userResourceId string, envId string, organizationId int32, orgResourceId string) *AuthConfig {
auth := &AuthConfig{
func createAuthConfig(userId int32, email, userResourceId, envId string, organizationId int32, orgResourceId string) *AuthConfig {
return &AuthConfig{
User: &orgv1.User{
Id: userId,
Email: email,
Expand All @@ -199,50 +197,42 @@ func createAuthConfig(userId int32, email string, userResourceId string, envId s
Id: organizationId,
ResourceId: orgResourceId,
},
Accounts: []*orgv1.Account{
{Id: envId},
},
Accounts: []*orgv1.Account{{Id: envId}},
}
return auth
}

func createContextState(authConfig *AuthConfig, authToken string) *ContextState {
contextState := &ContextState{
return &ContextState{
Auth: authConfig,
AuthToken: authToken,
}
return contextState
}

func createAPIKeyPair(apiKey, apiSecret string) *APIKeyPair {
keyPair := &APIKeyPair{
return &APIKeyPair{
Key: apiKey,
Secret: apiSecret,
}
return keyPair
}

func createKafkaCluster(clusterID string, clusterName string, apiKeyPair *APIKeyPair) *KafkaClusterConfig {
cluster := &KafkaClusterConfig{
func createKafkaCluster(clusterID, clusterName string, apiKeyPair *APIKeyPair) *KafkaClusterConfig {
return &KafkaClusterConfig{
ID: clusterID,
Name: clusterName,
Bootstrap: bootstrapServer,
APIEndpoint: kafkaApiEndpoint,
APIKeys: map[string]*APIKeyPair{
apiKeyPair.Key: apiKeyPair,
},
APIKey: apiKeyPair.Key,
APIKeys: map[string]*APIKeyPair{apiKeyPair.Key: apiKeyPair},
APIKey: apiKeyPair.Key,
LastUpdate: time.Now(),
}
return cluster
}

func createSRCluster(apiKeyPair *APIKeyPair) *SchemaRegistryCluster {
cluster := &SchemaRegistryCluster{
return &SchemaRegistryCluster{
Id: srClusterId,
SchemaRegistryEndpoint: srEndpoint,
SrCredentials: apiKeyPair,
}
return cluster
}

func setUpConfig(conf *Config, ctx *Context, platform *Platform, credential *Credential, contextState *ContextState) {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/config/v1/test_json/account_overwrite.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
"api_secret": "def-secret-456"
}
},
"api_key": "abc-key-123"
"api_key": "abc-key-123",
"last_update": "0001-01-01T00:00:00Z"
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/config/v1/test_json/stateful_cloud.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
"api_secret": "def-secret-456"
}
},
"api_key": "abc-key-123"
"api_key": "abc-key-123",
"last_update": "0001-01-01T00:00:00Z"
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/config/v1/test_json/stateful_onprem.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"api_secret": "def-secret-456"
}
},
"api_key": "abc-key-123"
"api_key": "abc-key-123",
"last_update": "0001-01-01T00:00:00Z"
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/config/v1/test_json/stateless_cloud.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"api_secret": "def-secret-456"
}
},
"api_key": "abc-key-123"
"api_key": "abc-key-123",
"last_update": "0001-01-01T00:00:00Z"
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/config/v1/test_json/stateless_onprem.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"api_secret": "def-secret-456"
}
},
"api_key": "abc-key-123"
"api_key": "abc-key-123",
"last_update": "0001-01-01T00:00:00Z"
}
}
},
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/dynamic-config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
)

func (d *DynamicContext) FetchCluster(clusterId string) (*schedv1.KafkaCluster, error) {
envId, err := d.AuthenticatedEnvId()
environmentId, err := d.AuthenticatedEnvId()
if err != nil {
return nil, err
}

req := &schedv1.KafkaCluster{AccountId: envId, Id: clusterId}
cluster, err := d.Client.Kafka.Describe(context.Background(), req)
if err != nil {
return nil, errors.CatchKafkaNotFoundError(err, clusterId)
cluster := &schedv1.KafkaCluster{
AccountId: environmentId,
Id: clusterId,
}

return cluster, nil
cluster, err = d.Client.Kafka.Describe(context.Background(), cluster)
return cluster, errors.CatchKafkaNotFoundError(err, clusterId)
}

func (d *DynamicContext) FetchAPIKeyError(apiKey string, clusterID string) error {
Expand Down
16 changes: 12 additions & 4 deletions internal/pkg/dynamic-config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ import (
schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1"
)

// KafkaCluster creates an schedv1 struct from the Kafka cluster of the current context.
// KafkaCluster creates a schedv1 struct from the Kafka cluster of the current context.
func KafkaCluster(ctx *DynamicContext) (*schedv1.KafkaCluster, error) {
kcc, err := ctx.GetKafkaClusterForCommand()
environmentId, err := ctx.AuthenticatedEnvId()
if err != nil {
return nil, err
}
envId, err := ctx.AuthenticatedEnvId()

config, err := ctx.GetKafkaClusterForCommand()
if err != nil {
return nil, err
}
return &schedv1.KafkaCluster{AccountId: envId, Id: kcc.ID, ApiEndpoint: kcc.APIEndpoint}, nil

cluster := &schedv1.KafkaCluster{
AccountId: environmentId,
Id: config.ID,
ApiEndpoint: config.APIEndpoint,
}

return cluster, nil
}
32 changes: 12 additions & 20 deletions internal/pkg/dynamic-config/dynamic_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package dynamicconfig
import (
"context"
"fmt"
"strings"
"time"

orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1"
"github.com/confluentinc/ccloud-sdk-go-v1"

schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1"
"github.com/confluentinc/ccloud-sdk-go-v1"
"github.com/spf13/cobra"

"github.com/confluentinc/cli/internal/pkg/ccloudv2"
Expand Down Expand Up @@ -101,6 +100,7 @@ func (d *DynamicContext) GetKafkaClusterForCommand() (*v1.KafkaClusterConfig, er
if d.KafkaClusterContext == nil {
return nil, errors.NewErrorWithSuggestions(errors.NoKafkaSelectedErrorMsg, errors.NoKafkaSelectedSuggestions)
}

clusterId := d.KafkaClusterContext.GetActiveKafkaClusterId()
if clusterId == "" {
return nil, errors.NewErrorWithSuggestions(errors.NoKafkaSelectedErrorMsg, errors.NoKafkaSelectedSuggestions)
Expand All @@ -111,36 +111,28 @@ func (d *DynamicContext) GetKafkaClusterForCommand() (*v1.KafkaClusterConfig, er
}

func (d *DynamicContext) FindKafkaCluster(clusterId string) (*v1.KafkaClusterConfig, error) {
if cluster := d.KafkaClusterContext.GetKafkaClusterConfig(clusterId); cluster != nil {
return cluster, nil
if config := d.KafkaClusterContext.GetKafkaClusterConfig(clusterId); config != nil {
const week = 7 * 24 * time.Hour
if time.Now().Before(config.LastUpdate.Add(week)) {
return config, nil
}
}

if d.Client == nil {
return nil, errors.Errorf(errors.FindKafkaNoClientErrorMsg, clusterId)
}

// Resolve cluster details if not found locally.
kcc, err := d.FetchCluster(clusterId)
cluster, err := d.FetchCluster(clusterId)
if err != nil {
return nil, err
}

cluster := KafkaClusterToKafkaClusterConfig(kcc, make(map[string]*v1.APIKeyPair))
d.KafkaClusterContext.AddKafkaClusterConfig(cluster)
config := v1.NewKafkaClusterConfig(cluster)
d.KafkaClusterContext.AddKafkaClusterConfig(config)
err = d.Save()

return cluster, err
}

func KafkaClusterToKafkaClusterConfig(kcc *schedv1.KafkaCluster, apiKeys map[string]*v1.APIKeyPair) *v1.KafkaClusterConfig {
return &v1.KafkaClusterConfig{
ID: kcc.Id,
Name: kcc.Name,
Bootstrap: strings.TrimPrefix(kcc.Endpoint, "SASL_SSL://"),
APIEndpoint: kcc.ApiEndpoint,
APIKeys: apiKeys,
RestEndpoint: kcc.RestEndpoint,
}
return config, err
}

func (d *DynamicContext) SetActiveKafkaCluster(clusterId string) error {
Expand Down
Loading