From 87cdeb56fe0dd19e607aa1c86c0b73dba5069e63 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Fri, 24 Jun 2022 17:07:36 -0700 Subject: [PATCH] add ttl to kafka cluster config struct --- internal/pkg/cmd/prerunner.go | 24 +------- internal/pkg/cmd/prerunner_test.go | 5 +- .../pkg/config/v1/kafka_cluster_config.go | 34 ++++++++--- .../pkg/config/v1/kafka_cluster_context.go | 4 +- internal/pkg/config/v1/mock.go | 40 +++++-------- .../v1/test_json/account_overwrite.json | 3 +- .../config/v1/test_json/stateful_cloud.json | 3 +- .../config/v1/test_json/stateful_onprem.json | 3 +- .../config/v1/test_json/stateless_cloud.json | 3 +- .../config/v1/test_json/stateless_onprem.json | 3 +- internal/pkg/dynamic-config/client.go | 12 ++-- internal/pkg/dynamic-config/config.go | 16 ++++-- .../pkg/dynamic-config/dynamic_context.go | 32 ++++------- .../dynamic-config/dynamic_context_test.go | 56 ++++++++++++++++++- 14 files changed, 142 insertions(+), 96 deletions(-) diff --git a/internal/pkg/cmd/prerunner.go b/internal/pkg/cmd/prerunner.go index cf7bfaa5ec..36e56769c8 100644 --- a/internal/pkg/cmd/prerunner.go +++ b/internal/pkg/cmd/prerunner.go @@ -463,32 +463,12 @@ func getKafkaRestEndpoint(ctx *dynamicconfig.DynamicContext) (string, string, er return "", "", nil } - clusterConfig, err := ctx.GetKafkaClusterForCommand() + config, err := ctx.GetKafkaClusterForCommand() if err != nil { return "", "", err } - if clusterConfig.RestEndpoint != "" { - return clusterConfig.RestEndpoint, clusterConfig.ID, nil - } - - // if clusterConfig.RestEndpoint is empty, fetch the cluster to ensure config isn't just out of date - // potentially remove this once Rest Proxy is enabled across prod - kafkaCluster, err := ctx.FetchCluster(clusterConfig.ID) - if err != nil { - return "", "", err - } - - // no need to update the config if it's still empty - if kafkaCluster.RestEndpoint == "" { - return "", clusterConfig.ID, nil - } - - // update config to have updated cluster if rest endpoint is no longer "" - clusterConfig = dynamicconfig.KafkaClusterToKafkaClusterConfig(kafkaCluster, clusterConfig.APIKeys) - ctx.KafkaClusterContext.AddKafkaClusterConfig(clusterConfig) - err = ctx.Save() - return kafkaCluster.RestEndpoint, clusterConfig.ID, err + return config.RestEndpoint, config.ID, err } // Converts a ccloud base URL to the appropriate Metrics URL. diff --git a/internal/pkg/cmd/prerunner_test.go b/internal/pkg/cmd/prerunner_test.go index 0e3ec136cc..4c9c106599 100644 --- a/internal/pkg/cmd/prerunner_test.go +++ b/internal/pkg/cmd/prerunner_test.go @@ -8,8 +8,6 @@ import ( "strings" "testing" - launchdarkly "github.com/confluentinc/cli/internal/pkg/featureflags" - flowv1 "github.com/confluentinc/cc-structs/kafka/flow/v1" orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1" "github.com/confluentinc/ccloud-sdk-go-v1" @@ -23,6 +21,7 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" v1 "github.com/confluentinc/cli/internal/pkg/config/v1" "github.com/confluentinc/cli/internal/pkg/errors" + "github.com/confluentinc/cli/internal/pkg/featureflags" "github.com/confluentinc/cli/internal/pkg/form" "github.com/confluentinc/cli/internal/pkg/log" pmock "github.com/confluentinc/cli/internal/pkg/mock" @@ -116,7 +115,7 @@ func getPreRunBase() *pcmd.PreRun { } func TestPreRun_Anonymous_SetLoggingLevel(t *testing.T) { - launchdarkly.Init(nil, true) + featureflags.Init(nil, true) tests := map[string]log.Level{ "": log.ERROR, diff --git a/internal/pkg/config/v1/kafka_cluster_config.go b/internal/pkg/config/v1/kafka_cluster_config.go index 40aabac403..4765765b33 100644 --- a/internal/pkg/config/v1/kafka_cluster_config.go +++ b/internal/pkg/config/v1/kafka_cluster_config.go @@ -1,13 +1,33 @@ package v1 +import ( + "strings" + "time" + + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" +) + // KafkaClusterConfig represents a connection to a Kafka cluster. type KafkaClusterConfig struct { - ID string `json:"id" hcl:"id"` - Name string `json:"name" hcl:"name"` - Bootstrap string `json:"bootstrap_servers" hcl:"bootstrap_servers"` - APIEndpoint string `json:"api_endpoint,omitempty" hcl:"api_endpoint"` - APIKeys map[string]*APIKeyPair `json:"api_keys" hcl:"api_keys"` + ID string `json:"id" hcl:"id"` + Name string `json:"name" hcl:"name"` + Bootstrap string `json:"bootstrap_servers" hcl:"bootstrap_servers"` + APIEndpoint string `json:"api_endpoint,omitempty" hcl:"api_endpoint"` + RestEndpoint string `json:"rest_endpoint,omitempty" hcl:"rest_endpoint"` + APIKeys map[string]*APIKeyPair `json:"api_keys" hcl:"api_keys"` // APIKey is your active api key for this cluster and references a key in the APIKeys map - APIKey string `json:"api_key,omitempty" hcl:"api_key"` - RestEndpoint string `json:"rest_endpoint,omitempty" hcl:"rest_endpoint"` + APIKey string `json:"api_key,omitempty" hcl:"api_key"` + LastUpdate time.Time `json:"last_update,omitempty" hcl:"last_update"` +} + +func NewKafkaClusterConfig(cluster *schedv1.KafkaCluster) *KafkaClusterConfig { + return &KafkaClusterConfig{ + ID: cluster.Id, + Name: cluster.Name, + Bootstrap: strings.TrimPrefix(cluster.Endpoint, "SASL_SSL://"), + APIEndpoint: cluster.ApiEndpoint, + RestEndpoint: cluster.RestEndpoint, + APIKeys: make(map[string]*APIKeyPair), + LastUpdate: time.Now(), + } } diff --git a/internal/pkg/config/v1/kafka_cluster_context.go b/internal/pkg/config/v1/kafka_cluster_context.go index e19758faf7..82e62725a7 100644 --- a/internal/pkg/config/v1/kafka_cluster_context.go +++ b/internal/pkg/config/v1/kafka_cluster_context.go @@ -84,8 +84,8 @@ func (k *KafkaClusterContext) GetKafkaClusterConfig(clusterId string) *KafkaClus if !k.EnvContext { return k.KafkaClusterConfigs[clusterId] } - kafkaEnvContext := k.GetCurrentKafkaEnvContext() - return kafkaEnvContext.KafkaClusterConfigs[clusterId] + + return k.GetCurrentKafkaEnvContext().KafkaClusterConfigs[clusterId] } func (k *KafkaClusterContext) AddKafkaClusterConfig(kcc *KafkaClusterConfig) { diff --git a/internal/pkg/config/v1/mock.go b/internal/pkg/config/v1/mock.go index c238f7c2bd..c433fb9315 100644 --- a/internal/pkg/config/v1/mock.go +++ b/internal/pkg/config/v1/mock.go @@ -2,6 +2,7 @@ package v1 import ( "fmt" + "time" orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1" @@ -163,32 +164,29 @@ func AuthenticatedConfigMock(params mockConfigParams) *Config { } func createUsernameCredential(credentialName string, auth *AuthConfig) *Credential { - credential := &Credential{ + return &Credential{ Name: credentialName, Username: auth.User.Email, CredentialType: Username, } - return credential } func createAPIKeyCredential(credentialName string, apiKeyPair *APIKeyPair) *Credential { - credential := &Credential{ + return &Credential{ Name: credentialName, APIKeyPair: apiKeyPair, CredentialType: APIKey, } - return credential } func createPlatform(name, server string) *Platform { - platform := &Platform{ + return &Platform{ Name: name, Server: server, } - return platform } -func createAuthConfig(userId int32, email string, userResourceId string, envId string, organizationId int32, orgResourceId string) *AuthConfig { - auth := &AuthConfig{ +func createAuthConfig(userId int32, email, userResourceId, envId string, organizationId int32, orgResourceId string) *AuthConfig { + return &AuthConfig{ User: &orgv1.User{ Id: userId, Email: email, @@ -199,50 +197,42 @@ func createAuthConfig(userId int32, email string, userResourceId string, envId s Id: organizationId, ResourceId: orgResourceId, }, - Accounts: []*orgv1.Account{ - {Id: envId}, - }, + Accounts: []*orgv1.Account{{Id: envId}}, } - return auth } func createContextState(authConfig *AuthConfig, authToken string) *ContextState { - contextState := &ContextState{ + return &ContextState{ Auth: authConfig, AuthToken: authToken, } - return contextState } func createAPIKeyPair(apiKey, apiSecret string) *APIKeyPair { - keyPair := &APIKeyPair{ + return &APIKeyPair{ Key: apiKey, Secret: apiSecret, } - return keyPair } -func createKafkaCluster(clusterID string, clusterName string, apiKeyPair *APIKeyPair) *KafkaClusterConfig { - cluster := &KafkaClusterConfig{ +func createKafkaCluster(clusterID, clusterName string, apiKeyPair *APIKeyPair) *KafkaClusterConfig { + return &KafkaClusterConfig{ ID: clusterID, Name: clusterName, Bootstrap: bootstrapServer, APIEndpoint: kafkaApiEndpoint, - APIKeys: map[string]*APIKeyPair{ - apiKeyPair.Key: apiKeyPair, - }, - APIKey: apiKeyPair.Key, + APIKeys: map[string]*APIKeyPair{apiKeyPair.Key: apiKeyPair}, + APIKey: apiKeyPair.Key, + LastUpdate: time.Now(), } - return cluster } func createSRCluster(apiKeyPair *APIKeyPair) *SchemaRegistryCluster { - cluster := &SchemaRegistryCluster{ + return &SchemaRegistryCluster{ Id: srClusterId, SchemaRegistryEndpoint: srEndpoint, SrCredentials: apiKeyPair, } - return cluster } func setUpConfig(conf *Config, ctx *Context, platform *Platform, credential *Credential, contextState *ContextState) { diff --git a/internal/pkg/config/v1/test_json/account_overwrite.json b/internal/pkg/config/v1/test_json/account_overwrite.json index ebbfae2920..9f39fba654 100644 --- a/internal/pkg/config/v1/test_json/account_overwrite.json +++ b/internal/pkg/config/v1/test_json/account_overwrite.json @@ -50,7 +50,8 @@ "api_secret": "def-secret-456" } }, - "api_key": "abc-key-123" + "api_key": "abc-key-123", + "last_update": "0001-01-01T00:00:00Z" } } } diff --git a/internal/pkg/config/v1/test_json/stateful_cloud.json b/internal/pkg/config/v1/test_json/stateful_cloud.json index 60f8d9fde8..ae53449dac 100644 --- a/internal/pkg/config/v1/test_json/stateful_cloud.json +++ b/internal/pkg/config/v1/test_json/stateful_cloud.json @@ -50,7 +50,8 @@ "api_secret": "def-secret-456" } }, - "api_key": "abc-key-123" + "api_key": "abc-key-123", + "last_update": "0001-01-01T00:00:00Z" } } } diff --git a/internal/pkg/config/v1/test_json/stateful_onprem.json b/internal/pkg/config/v1/test_json/stateful_onprem.json index 0cf5a8e7b8..c437fa1c9c 100644 --- a/internal/pkg/config/v1/test_json/stateful_onprem.json +++ b/internal/pkg/config/v1/test_json/stateful_onprem.json @@ -48,7 +48,8 @@ "api_secret": "def-secret-456" } }, - "api_key": "abc-key-123" + "api_key": "abc-key-123", + "last_update": "0001-01-01T00:00:00Z" } } }, diff --git a/internal/pkg/config/v1/test_json/stateless_cloud.json b/internal/pkg/config/v1/test_json/stateless_cloud.json index 23f54fc1f3..3be6bf5f49 100644 --- a/internal/pkg/config/v1/test_json/stateless_cloud.json +++ b/internal/pkg/config/v1/test_json/stateless_cloud.json @@ -48,7 +48,8 @@ "api_secret": "def-secret-456" } }, - "api_key": "abc-key-123" + "api_key": "abc-key-123", + "last_update": "0001-01-01T00:00:00Z" } } }, diff --git a/internal/pkg/config/v1/test_json/stateless_onprem.json b/internal/pkg/config/v1/test_json/stateless_onprem.json index 397e068d56..cae8f9b78d 100644 --- a/internal/pkg/config/v1/test_json/stateless_onprem.json +++ b/internal/pkg/config/v1/test_json/stateless_onprem.json @@ -48,7 +48,8 @@ "api_secret": "def-secret-456" } }, - "api_key": "abc-key-123" + "api_key": "abc-key-123", + "last_update": "0001-01-01T00:00:00Z" } } }, diff --git a/internal/pkg/dynamic-config/client.go b/internal/pkg/dynamic-config/client.go index 796a7251be..414aca31db 100644 --- a/internal/pkg/dynamic-config/client.go +++ b/internal/pkg/dynamic-config/client.go @@ -10,18 +10,18 @@ import ( ) func (d *DynamicContext) FetchCluster(clusterId string) (*schedv1.KafkaCluster, error) { - envId, err := d.AuthenticatedEnvId() + environmentId, err := d.AuthenticatedEnvId() if err != nil { return nil, err } - req := &schedv1.KafkaCluster{AccountId: envId, Id: clusterId} - cluster, err := d.Client.Kafka.Describe(context.Background(), req) - if err != nil { - return nil, errors.CatchKafkaNotFoundError(err, clusterId) + cluster := &schedv1.KafkaCluster{ + AccountId: environmentId, + Id: clusterId, } - return cluster, nil + cluster, err = d.Client.Kafka.Describe(context.Background(), cluster) + return cluster, errors.CatchKafkaNotFoundError(err, clusterId) } func (d *DynamicContext) FetchAPIKeyError(apiKey string, clusterID string) error { diff --git a/internal/pkg/dynamic-config/config.go b/internal/pkg/dynamic-config/config.go index 23eebc7b4b..ded6a485f8 100644 --- a/internal/pkg/dynamic-config/config.go +++ b/internal/pkg/dynamic-config/config.go @@ -4,15 +4,23 @@ import ( schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" ) -// KafkaCluster creates an schedv1 struct from the Kafka cluster of the current context. +// KafkaCluster creates a schedv1 struct from the Kafka cluster of the current context. func KafkaCluster(ctx *DynamicContext) (*schedv1.KafkaCluster, error) { - kcc, err := ctx.GetKafkaClusterForCommand() + environmentId, err := ctx.AuthenticatedEnvId() if err != nil { return nil, err } - envId, err := ctx.AuthenticatedEnvId() + + config, err := ctx.GetKafkaClusterForCommand() if err != nil { return nil, err } - return &schedv1.KafkaCluster{AccountId: envId, Id: kcc.ID, ApiEndpoint: kcc.APIEndpoint}, nil + + cluster := &schedv1.KafkaCluster{ + AccountId: environmentId, + Id: config.ID, + ApiEndpoint: config.APIEndpoint, + } + + return cluster, nil } diff --git a/internal/pkg/dynamic-config/dynamic_context.go b/internal/pkg/dynamic-config/dynamic_context.go index 4fef7244b3..9b1f670973 100644 --- a/internal/pkg/dynamic-config/dynamic_context.go +++ b/internal/pkg/dynamic-config/dynamic_context.go @@ -3,12 +3,11 @@ package dynamicconfig import ( "context" "fmt" - "strings" + "time" orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1" - "github.com/confluentinc/ccloud-sdk-go-v1" - schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" + "github.com/confluentinc/ccloud-sdk-go-v1" "github.com/spf13/cobra" "github.com/confluentinc/cli/internal/pkg/ccloudv2" @@ -101,6 +100,7 @@ func (d *DynamicContext) GetKafkaClusterForCommand() (*v1.KafkaClusterConfig, er if d.KafkaClusterContext == nil { return nil, errors.NewErrorWithSuggestions(errors.NoKafkaSelectedErrorMsg, errors.NoKafkaSelectedSuggestions) } + clusterId := d.KafkaClusterContext.GetActiveKafkaClusterId() if clusterId == "" { return nil, errors.NewErrorWithSuggestions(errors.NoKafkaSelectedErrorMsg, errors.NoKafkaSelectedSuggestions) @@ -111,8 +111,11 @@ func (d *DynamicContext) GetKafkaClusterForCommand() (*v1.KafkaClusterConfig, er } func (d *DynamicContext) FindKafkaCluster(clusterId string) (*v1.KafkaClusterConfig, error) { - if cluster := d.KafkaClusterContext.GetKafkaClusterConfig(clusterId); cluster != nil { - return cluster, nil + if config := d.KafkaClusterContext.GetKafkaClusterConfig(clusterId); config != nil { + const week = 7 * 24 * time.Hour + if time.Now().Before(config.LastUpdate.Add(week)) { + return config, nil + } } if d.Client == nil { @@ -120,27 +123,16 @@ func (d *DynamicContext) FindKafkaCluster(clusterId string) (*v1.KafkaClusterCon } // Resolve cluster details if not found locally. - kcc, err := d.FetchCluster(clusterId) + cluster, err := d.FetchCluster(clusterId) if err != nil { return nil, err } - cluster := KafkaClusterToKafkaClusterConfig(kcc, make(map[string]*v1.APIKeyPair)) - d.KafkaClusterContext.AddKafkaClusterConfig(cluster) + config := v1.NewKafkaClusterConfig(cluster) + d.KafkaClusterContext.AddKafkaClusterConfig(config) err = d.Save() - return cluster, err -} - -func KafkaClusterToKafkaClusterConfig(kcc *schedv1.KafkaCluster, apiKeys map[string]*v1.APIKeyPair) *v1.KafkaClusterConfig { - return &v1.KafkaClusterConfig{ - ID: kcc.Id, - Name: kcc.Name, - Bootstrap: strings.TrimPrefix(kcc.Endpoint, "SASL_SSL://"), - APIEndpoint: kcc.ApiEndpoint, - APIKeys: apiKeys, - RestEndpoint: kcc.RestEndpoint, - } + return config, err } func (d *DynamicContext) SetActiveKafkaCluster(clusterId string) error { diff --git a/internal/pkg/dynamic-config/dynamic_context_test.go b/internal/pkg/dynamic-config/dynamic_context_test.go index 69b202f34b..6a1ebfe3cf 100644 --- a/internal/pkg/dynamic-config/dynamic_context_test.go +++ b/internal/pkg/dynamic-config/dynamic_context_test.go @@ -4,14 +4,17 @@ import ( "context" "fmt" "testing" + "time" + orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1" + schedv1 "github.com/confluentinc/cc-structs/kafka/scheduler/v1" "github.com/confluentinc/ccloud-sdk-go-v1" "github.com/confluentinc/ccloud-sdk-go-v1/mock" - - orgv1 "github.com/confluentinc/cc-structs/kafka/org/v1" + "github.com/hashicorp/go-version" "github.com/spf13/cobra" "github.com/stretchr/testify/require" + "github.com/confluentinc/cli/internal/pkg/config" v1 "github.com/confluentinc/cli/internal/pkg/config/v1" "github.com/confluentinc/cli/internal/pkg/errors" pmock "github.com/confluentinc/cli/internal/pkg/mock" @@ -25,6 +28,55 @@ var ( apiEnvironment = "env-from-api-call" ) +func TestFindKafkaCluster_Unexpired(t *testing.T) { + update := time.Now() + + d := &DynamicContext{ + Context: &v1.Context{ + KafkaClusterContext: &v1.KafkaClusterContext{ + KafkaClusterConfigs: map[string]*v1.KafkaClusterConfig{ + "lkc-123456": {LastUpdate: update}, + }, + }, + }, + } + + config, err := d.FindKafkaCluster("lkc-123456") + require.NoError(t, err) + require.True(t, config.LastUpdate.Equal(update)) +} + +func TestFindKafkaCluster_Expired(t *testing.T) { + update := time.Now().Add(-7 * 24 * time.Hour) + + d := &DynamicContext{ + Context: &v1.Context{ + KafkaClusterContext: &v1.KafkaClusterContext{ + KafkaClusterConfigs: map[string]*v1.KafkaClusterConfig{ + "lkc-123456": {LastUpdate: update}, + }, + }, + Credential: &v1.Credential{CredentialType: v1.Username}, + State: &v1.ContextState{ + Auth: &v1.AuthConfig{Account: &orgv1.Account{Id: "env-123456"}}, + AuthToken: "token", + }, + Config: &v1.Config{BaseConfig: &config.BaseConfig{Ver: config.Version{Version: &version.Version{}}}}, + }, + Client: &ccloud.Client{ + Kafka: &mock.Kafka{ + DescribeFunc: func(ctx context.Context, cluster *schedv1.KafkaCluster) (*schedv1.KafkaCluster, error) { + return &schedv1.KafkaCluster{}, nil + }, + }, + }, + } + + config, err := d.FindKafkaCluster("lkc-123456") + require.NoError(t, err) + require.True(t, config.LastUpdate.After(update)) +} + func TestDynamicContext_ParseFlagsIntoContext(t *testing.T) { client := buildCcloudMockClient() tests := []struct {