Skip to content

Commit

Permalink
Fixups from #3324 (#3419)
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed May 21, 2024
1 parent eb82293 commit 93cf5cb
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 75 deletions.
17 changes: 10 additions & 7 deletions bindings/postgres/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,20 @@ builtinAuthenticationProfiles:
description: |
Must be set to `true` to enable the component to retrieve access tokens from AWS IAM.
This authentication method only works with AWS Relational Database Service for PostgreSQL databases.
- name: connectionString
required: true
sensitive: true
description: |
The connection string for the PostgreSQL database
This must contain the user, which corresponds to the name of the user created inside PostgreSQL that maps to the AWS IAM policy. This connection string should not contain any password. Note that the database name field is denoted by dbname with AWS.
example: |
"host=mydb.postgres.database.aws.com user=myapplication port=5432 dbname=dapr_test sslmode=require"
type: string
- name: awsRegion
type: string
required: true
description: |
The AWS Region where the MSK Kafka broker is deployed to.
The AWS Region where the AWS Relational Database Service is deployed to.
example: '"us-east-1"'
- name: awsAccessKey
type: string
Expand All @@ -66,12 +75,6 @@ builtinAuthenticationProfiles:
description: |
The secret key associated with the access key.
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
- name: awsSessionToken
type: string
sensitive: true
description: |
AWS session token to use. A session token is only required if you are using temporary security credentials.
example: '"TOKEN"'
authenticationProfiles:
- title: "Connection string"
description: "Authenticate using a Connection String"
Expand Down
2 changes: 1 addition & 1 deletion bindings/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *Postgres) Init(ctx context.Context, meta bindings.Metadata) error {
return err
}

poolConfig, err := m.GetPgxPoolConfig(ctx)
poolConfig, err := m.GetPgxPoolConfig()
if err != nil {
return err
}
Expand Down
12 changes: 5 additions & 7 deletions common/authentication/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,12 @@ func NewEnvironmentSettings(md map[string]string) (EnvironmentSettings, error) {

type AWSIAM struct {
// Ignored by metadata parser because included in built-in authentication profile
// access key to use for accessing postgresql.
// Access key to use for accessing PostgreSQL.
AWSAccessKey string `json:"awsAccessKey" mapstructure:"awsAccessKey"`
// secret key to use for accessing postgresql.
// Secret key to use for accessing PostgreSQL.
AWSSecretKey string `json:"awsSecretKey" mapstructure:"awsSecretKey"`
// aws session token to use.
AWSSessionToken string `mapstructure:"awsSessionToken"`
// aws region in which postgresql should create resources.
AWSRegion string `mapstructure:"awsRegion"`
// AWS region in which PostgreSQL is deployed.
AWSRegion string `json:"awsRegion" mapstructure:"awsRegion"`
}

type AWSIAMAuthOptions struct {
Expand Down Expand Up @@ -138,7 +136,7 @@ func (opts *AWSIAMAuthOptions) GetAccessToken(ctx context.Context) (string, erro
return authenticationToken, nil
}

func (opts *AWSIAMAuthOptions) InitiateAWSIAMAuth(ctx context.Context) error {
func (opts *AWSIAMAuthOptions) InitiateAWSIAMAuth() error {
// Set max connection lifetime to 8 minutes in postgres connection pool configuration.
// Note: this will refresh connections before the 15 min expiration on the IAM AWS auth token,
// while leveraging the BeforeConnect hook to recreate the token in time dynamically.
Expand Down
20 changes: 12 additions & 8 deletions common/authentication/postgresql/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ func (m *PostgresAuthMetadata) InitWithMetadata(meta map[string]string, opts Ini
}
switch {
case opts.AzureADEnabled && m.UseAzureAD:
// Populate the Azure environment if using Azure AD
m.azureEnv, err = azure.NewEnvironmentSettings(meta)
if err != nil {
return err
}
case opts.AWSIAMEnabled && m.UseAWSIAM:
// Populate the AWS environment if using AWS IAM
m.awsEnv, err = aws.NewEnvironmentSettings(meta)
if err != nil {
return err
Expand Down Expand Up @@ -101,7 +103,7 @@ func (m *PostgresAuthMetadata) ValidateAwsIamFields() (string, string, string, e
}

// GetPgxPoolConfig returns the pgxpool.Config object that contains the credentials for connecting to PostgreSQL.
func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.Config, error) {
func (m *PostgresAuthMetadata) GetPgxPoolConfig() (*pgxpool.Config, error) {
// Get the config from the connection string
config, err := pgxpool.ParseConfig(m.ConnectionString)
if err != nil {
Expand Down Expand Up @@ -129,8 +131,9 @@ func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.C
}
}

// Check if we should use Azure AD
if m.UseAzureAD {
switch {
case m.UseAzureAD:
// Use Azure AD
tokenCred, errToken := m.azureEnv.GetTokenCredential()
if errToken != nil {
return nil, errToken
Expand All @@ -155,11 +158,11 @@ func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.C
cc.Password = at.Token
return nil
}
}
if m.UseAWSIAM {
case m.UseAWSIAM:
// We should use AWS IAM
awsRegion, awsAccessKey, awsSecretKey, err := m.ValidateAwsIamFields()
if err != nil {
err = fmt.Errorf("failed to validate AWS IAM authentication fields: %v", err)
err = fmt.Errorf("failed to validate AWS IAM authentication fields: %w", err)
return nil, err
}

Expand All @@ -171,11 +174,12 @@ func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.C
SecretKey: awsSecretKey,
}

err = awsOpts.InitiateAWSIAMAuth(ctx)
err = awsOpts.InitiateAWSIAMAuth()
if err != nil {
err = fmt.Errorf("failed to initiate AWS IAM authentication rotation dynamically: %v", err)
err = fmt.Errorf("failed to initiate AWS IAM authentication rotation: %w", err)
return nil, err
}
}

return config, nil
}
2 changes: 1 addition & 1 deletion common/component/postgresql/v1/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error {
return fmt.Errorf("failed to parse metadata: %w", err)
}

config, err := p.metadata.GetPgxPoolConfig(ctx)
config, err := p.metadata.GetPgxPoolConfig()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions common/component/postgresql/v1/postgresql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (p *PostgreSQLQuery) Query(parentCtx context.Context, req *state.QueryReque
if err := qbuilder.BuildQuery(&req.Query); err != nil {
return &state.QueryResponse{}, err
}
data, token, err := q.execute(parentCtx, p.logger, p.db)
data, token, err := q.execute(parentCtx, p.db)
if err != nil {
return &state.QueryResponse{}, err
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (q *Query) Finalize(filters string, qq *query.Query) error {
return nil
}

func (q *Query) execute(ctx context.Context, logger logger.Logger, db pginterfaces.DBQuerier) ([]state.QueryItem, string, error) {
func (q *Query) execute(ctx context.Context, db pginterfaces.DBQuerier) ([]state.QueryItem, string, error) {
rows, err := db.Query(ctx, q.query, q.params...)
if err != nil {
return nil, "", err
Expand Down
17 changes: 10 additions & 7 deletions configuration/postgres/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,20 @@ builtinAuthenticationProfiles:
description: |
Must be set to `true` to enable the component to retrieve access tokens from AWS IAM.
This authentication method only works with AWS Relational Database Service for PostgreSQL databases.
- name: connectionString
required: true
sensitive: true
description: |
The connection string for the PostgreSQL database
This must contain the user, which corresponds to the name of the user created inside PostgreSQL that maps to the AWS IAM policy. This connection string should not contain any password. Note that the database name field is denoted by dbname with AWS.
example: |
"host=mydb.postgres.database.aws.com user=myapplication port=5432 dbname=dapr_test sslmode=require"
type: string
- name: awsRegion
type: string
required: true
description: |
The AWS Region where the MSK Kafka broker is deployed to.
The AWS Region where the AWS Relational Database Service is deployed to.
example: '"us-east-1"'
- name: awsAccessKey
type: string
Expand All @@ -56,12 +65,6 @@ builtinAuthenticationProfiles:
description: |
The secret key associated with the access key.
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
- name: awsSessionToken
type: string
sensitive: true
description: |
AWS session token to use. A session token is only required if you are using temporary security credentials.
example: '"TOKEN"'
authenticationProfiles:
- title: "Connection string"
description: "Authenticate using a Connection String."
Expand Down
6 changes: 3 additions & 3 deletions configuration/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (p *ConfigurationStore) Init(ctx context.Context, metadata configuration.Me
}

p.ActiveSubscriptions = make(map[string]*subscription)
p.client, err = p.connectDB(ctx, p.metadata.ConnectionString)
p.client, err = p.connectDB(ctx)
if err != nil {
return fmt.Errorf("error connecting to configuration store: '%w'", err)
}
Expand Down Expand Up @@ -285,8 +285,8 @@ func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler
}
}

func (p *ConfigurationStore) connectDB(ctx context.Context, connStr string) (*pgxpool.Pool, error) {
config, err := p.metadata.GetPgxPoolConfig(ctx)
func (p *ConfigurationStore) connectDB(ctx context.Context) (*pgxpool.Pool, error) {
config, err := p.metadata.GetPgxPoolConfig()
if err != nil {
return nil, fmt.Errorf("PostgreSQL configuration store connection error: %s", err)
}
Expand Down
6 changes: 0 additions & 6 deletions state/postgresql/v1/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ builtinAuthenticationProfiles:
description: |
The secret key associated with the access key.
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
- name: awsSessionToken
type: string
sensitive: true
description: |
AWS session token to use. A session token is only required if you are using temporary security credentials.
example: '"TOKEN"'
authenticationProfiles:
- title: "Connection string"
description: "Authenticate using a Connection String"
Expand Down
6 changes: 0 additions & 6 deletions state/postgresql/v2/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ builtinAuthenticationProfiles:
description: |
The secret key associated with the access key.
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
- name: awsSessionToken
type: string
sensitive: true
description: |
AWS session token to use. A session token is only required if you are using temporary security credentials.
example: '"TOKEN"'
authenticationProfiles:
- title: "Connection string"
description: "Authenticate using a Connection String"
Expand Down
32 changes: 6 additions & 26 deletions state/postgresql/v2/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/dapr/components-contrib/state"
stateutils "github.com/dapr/components-contrib/state/utils"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/utils"
)

// PostgreSQL state store.
Expand All @@ -50,8 +49,7 @@ type PostgreSQL struct {
gc sqlinternal.GarbageCollector

enableAzureAD bool

enableAWSIAM bool
enableAWSIAM bool
}

type Options struct {
Expand All @@ -71,13 +69,6 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store {
return NewPostgreSQLStateStoreWithOptions(logger, Options{})
}

// NewPostgreSQLStateStoreWithTrueOptions creates a new instance of PostgreSQL state store v2 with Azure AD authentication and AWS IAM authentication disabled.
// The v2 of the component uses a different format for storing data, always in a BYTEA column, which is more efficient than the JSONB column used in v1.
// Additionally, v2 uses random UUIDs for etags instead of the xmin column, expanding support to all Postgres-compatible databases such as CockroachDB, etc.
func NewPostgreSQLStateStoreWithTrueOptions(logger logger.Logger) state.Store {
return NewPostgreSQLStateStoreWithOptions(logger, Options{NoAWSIAM: true, NoAzureAD: true})
}

// NewPostgreSQLStateStoreWithOptions creates a new instance of PostgreSQL state store with options.
func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) state.Store {
s := &PostgreSQL{
Expand All @@ -90,36 +81,25 @@ func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) stat
}

// Init sets up Postgres connection and performs migrations
func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error {
var (
useAWS bool
useAzure bool
err error
)

awsIam, _ := metadata.GetMetadataProperty(meta.Properties, "UseAWSIAM")
useAWS = utils.IsTruthy(awsIam)
azureAd, _ := metadata.GetMetadataProperty(meta.Properties, "UseAzureAD")
useAzure = utils.IsTruthy(azureAd)

func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) (err error) {
opts := pgauth.InitWithMetadataOpts{
AzureADEnabled: useAzure,
AWSIAMEnabled: useAWS,
AzureADEnabled: p.enableAzureAD,
AWSIAMEnabled: p.enableAWSIAM,
}

err = p.metadata.InitWithMetadata(meta, opts)
if err != nil {
return err
}

config, err := p.metadata.GetPgxPoolConfig(ctx)
config, err := p.metadata.GetPgxPoolConfig()
if err != nil {
return err
}

connCtx, connCancel := context.WithTimeout(ctx, p.metadata.Timeout)
p.db, err = pgxpool.NewWithConfig(connCtx, config)
defer connCancel()
connCancel()
if err != nil {
err = fmt.Errorf("failed to connect to the database: %w", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/configupdater/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (r *ConfigUpdater) Init(props map[string]string) error {
return fmt.Errorf("missing postgreSQL configuration table name")
}

config, err := md.GetPgxPoolConfig(ctx)
config, err := md.GetPgxPoolConfig()
if err != nil {
return fmt.Errorf("postgres configuration store connection error : %w", err)
}
Expand Down

0 comments on commit 93cf5cb

Please sign in to comment.