Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions aws/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ func NewSecret(name string, cfg aws.Config) Secret {
}

func (s Secret) GetSecret(ctx context.Context) (map[string]interface{}, error) {
payload, err := s.GetSecretPayload(ctx)
if err != nil {
return nil, err
}

var v map[string]interface{}
if err := json.Unmarshal(payload, &v); err != nil {
return nil, fmt.Errorf("cannot decode secret string as map[string]interface{}: %s", err)
}
if v == nil {
return nil, fmt.Errorf("secret value is 'null' literal")
}
return v, nil
}

func (s Secret) GetSecretPayload(ctx context.Context) ([]byte, error) {
input := &secretsmanager.GetSecretValueInput{
SecretId: aws.String(s.name),
VersionStage: aws.String("AWSCURRENT"),
Expand All @@ -35,18 +51,22 @@ func (s Secret) GetSecret(ctx context.Context) (map[string]interface{}, error) {
if err != nil {
return nil, fmt.Errorf("Secrets Manager API error: %s", err)
}
blip.Debug("DEBUG: aws secret: %+v", *sv)

if sv.SecretString == nil || *sv.SecretString == "" {
return nil, fmt.Errorf("secret string is nil or empty")
name := ""
if sv.Name != nil {
name = *sv.Name
}
versionID := ""
if sv.VersionId != nil {
versionID = *sv.VersionId
}
blip.Debug("DEBUG: aws secret: name=%s version=%s", name, versionID)

var v map[string]interface{}
if err := json.Unmarshal([]byte(*sv.SecretString), &v); err != nil {
return nil, fmt.Errorf("cannot decode secret string as map[string]string: %s", err)
if sv.SecretString != nil && *sv.SecretString != "" {
return []byte(*sv.SecretString), nil
}
if v == nil {
return nil, fmt.Errorf("secret value is 'null' literal")
if len(sv.SecretBinary) > 0 {
return append([]byte(nil), sv.SecretBinary...), nil
}
return v, nil

return nil, fmt.Errorf("secret string and secret binary are empty")
}
54 changes: 54 additions & 0 deletions blip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package blip
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"math"
Expand Down Expand Up @@ -98,6 +99,55 @@ type SinkFactoryArgs struct {
Tags map[string]string // config.monitor.tags
}

// DbCredentials are MySQL credentials parsed or loaded for a connection.
type DbCredentials struct {
Username string
Password string
TLS ConfigTLS
}

// PasswordSecretParser maps an AWS Secrets Manager payload to database credentials.
// The DbCredentials argument is pre-populated with config defaults and can be modified
// by the parser. The payload is the raw SecretString or SecretBinary value.
type PasswordSecretParser func(context.Context, ConfigMonitor, []byte, *DbCredentials) error

// DefaultPasswordSecretParser parses the default AWS RDS secret shape:
// "password" is required, and "username" is optional.
func DefaultPasswordSecretParser(_ context.Context, cfg ConfigMonitor, payload []byte, credentials *DbCredentials) error {
if credentials == nil {
return fmt.Errorf("credentials destination is nil")
}
credentials.Username = cfg.Username

var secretPayload map[string]interface{}
if err := json.Unmarshal(payload, &secretPayload); err != nil {
return fmt.Errorf("cannot decode secret as JSON object: %s", err)
}
if secretPayload == nil {
return fmt.Errorf("secret value is 'null' literal")
}

username, ok := secretPayload["username"]
if ok {
usernameStr, ok := username.(string)
if ok {
credentials.Username = usernameStr
}
}

password, ok := secretPayload["password"]
if !ok {
return fmt.Errorf("error retrieving 'password' value of secret")
}
passwordStr, ok := password.(string)
if !ok {
return fmt.Errorf("invalid type for 'password' value of secret")
}
credentials.Password = passwordStr

return nil
}

// Plugins are function callbacks that override specific functionality of Blip.
// Plugins are optional, but if specified it overrides the built-in functionality.
type Plugins struct {
Expand All @@ -123,6 +173,10 @@ type Plugins struct {
// ModifyDB modifies the *sql.DB connection pool. Use with caution.
ModifyDB func(*sql.DB, string)

// ParsePasswordSecret maps an AWS Secrets Manager payload to MySQL credentials.
// If nil, Blip uses DefaultPasswordSecretParser.
ParsePasswordSecret PasswordSecretParser

// StartMonitor allows a monitor to start by returning true. Else the monitor
// is loaded but not started. This is used to load all monitors but start only
// certain monitors.
Expand Down
117 changes: 64 additions & 53 deletions dbconn/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,33 @@ var portSuffix = regexp.MustCompile(`:\d+$`)

// factory is the internal implementation of blip.DbFactory.
type factory struct {
awsConfig blip.AWSConfigFactory
modifyDB func(*sql.DB, string)
awsConfig blip.AWSConfigFactory
modifyDB func(*sql.DB, string)
passwordSecretParser blip.PasswordSecretParser
}

// ConnFactoryOption configures the default MySQL connection factory.
type ConnFactoryOption func(*factory)

// WithPasswordSecretParser sets the parser used for AWS Secrets Manager
// password-secret payloads.
func WithPasswordSecretParser(parser blip.PasswordSecretParser) ConnFactoryOption {
return func(f *factory) {
f.passwordSecretParser = parser
}
}

// NewConnFactory returns a blip.NewConnFactory that connects to MySQL.
// This is the only blip.NewConnFactor. It is created in Server.Defaults.
func NewConnFactory(awsConfig blip.AWSConfigFactory, modifyDB func(*sql.DB, string)) factory {
return factory{
func NewConnFactory(awsConfig blip.AWSConfigFactory, modifyDB func(*sql.DB, string), opts ...ConnFactoryOption) factory {
f := factory{
awsConfig: awsConfig,
modifyDB: modifyDB,
}
for _, opt := range opts {
opt(&f)
}
return f
}

// Make makes a *sql.DB for the given monitor config. On success, it also returns
Expand Down Expand Up @@ -141,7 +157,7 @@ func (f factory) Make(cfg blip.ConfigMonitor) (*sql.DB, string, error) {
// TLS is configured, so make sure we reload it when the credentials are reloaded in case
// it was changed
origCredentialFunc := credentialFunc
credentialFunc = func(ctx context.Context) (Credentials, error) {
credentialFunc = func(ctx context.Context) (blip.DbCredentials, error) {
creds, err := origCredentialFunc(ctx)
if err != nil {
return creds, err
Expand Down Expand Up @@ -267,13 +283,13 @@ func (f factory) Credentials(cfg blip.ConfigMonitor) (CredentialFunc, error) {
return nil, err
}
token := aws.NewAuthToken(cfg.Username, cfg.Hostname, awscfg)
return func(ctx context.Context) (Credentials, error) {
return func(ctx context.Context) (blip.DbCredentials, error) {
passwd, err := token.Password(ctx)
if err != nil {
return Credentials{}, err
return blip.DbCredentials{}, err
}

return Credentials{
return blip.DbCredentials{
Password: passwd,
Username: cfg.Username,
}, nil
Expand All @@ -282,52 +298,18 @@ func (f factory) Credentials(cfg blip.ConfigMonitor) (CredentialFunc, error) {

// Amazon Secrets Manager, could be rotated
if cfg.AWS.PasswordSecret != "" {
blip.Debug("%s: AWS Secrets Manager password", cfg.MonitorId)
awscfg, err := f.awsConfig.Make(blip.AWS{Region: cfg.AWS.Region}, cfg.Hostname)
if err != nil {
return nil, err
}
secret := aws.NewSecret(cfg.AWS.PasswordSecret, awscfg)
return func(ctx context.Context) (Credentials, error) {
newSecret, err := secret.GetSecret(ctx)
if err != nil {
return Credentials{}, err
}

username, ok := newSecret["username"]
if !ok {
// The username key is optional. Default to config
username = cfg.Username
}
usernameStr, ok := username.(string)
if !ok {
username = cfg.Username
}
password, ok := newSecret["password"]
if !ok {
return Credentials{}, fmt.Errorf("error retrieving 'password' value of secret")
}
passwordStr, ok := password.(string)
if !ok {
return Credentials{}, fmt.Errorf("invalid type for 'password' value of secret")
}

return Credentials{
Password: passwordStr,
Username: usernameStr,
}, nil
}, nil
return f.passwordSecretCredentialFunc(cfg)
}

// Password file, could be "rotated" (new password written to file)
if cfg.PasswordFile != "" {
blip.Debug("%s: password file", cfg.MonitorId)
return func(context.Context) (Credentials, error) {
return func(context.Context) (blip.DbCredentials, error) {
bytes, err := os.ReadFile(cfg.PasswordFile)
if err != nil {
return Credentials{}, err
return blip.DbCredentials{}, err
}
return Credentials{
return blip.DbCredentials{
Password: string(bytes),
Username: cfg.Username,
}, err
Expand All @@ -337,12 +319,12 @@ func (f factory) Credentials(cfg blip.ConfigMonitor) (CredentialFunc, error) {
// Credentials in my.cnf file, could be rotated (username and/or password, along with TLS config)
if cfg.MyCnf != "" {
blip.Debug("%s my.cnf credentials", cfg.MonitorId)
return func(context.Context) (Credentials, error) {
return func(context.Context) (blip.DbCredentials, error) {
cfg, tlscfg, err := ParseMyCnf(cfg.MyCnf)
if err != nil {
return Credentials{}, err
return blip.DbCredentials{}, err
}
return Credentials{
return blip.DbCredentials{
Password: cfg.Password,
Username: cfg.Username,
TLS: tlscfg,
Expand All @@ -353,14 +335,43 @@ func (f factory) Credentials(cfg blip.ConfigMonitor) (CredentialFunc, error) {
// Static password in Blip config file, not rotated
if cfg.Password != "" {
blip.Debug("%s: static password credentials", cfg.MonitorId)
return func(context.Context) (Credentials, error) {
return Credentials{Password: cfg.Password, Username: cfg.Username}, nil
return func(context.Context) (blip.DbCredentials, error) {
return blip.DbCredentials{Password: cfg.Password, Username: cfg.Username}, nil
}, nil
}

blip.Debug("%s: no password", cfg.MonitorId)
return func(context.Context) (Credentials, error) {
return Credentials{Password: "", Username: cfg.Username}, nil
return func(context.Context) (blip.DbCredentials, error) {
return blip.DbCredentials{Password: "", Username: cfg.Username}, nil
}, nil
}

func (f factory) passwordSecretCredentialFunc(cfg blip.ConfigMonitor) (CredentialFunc, error) {
blip.Debug("%s: AWS Secrets Manager password", cfg.MonitorId)
awscfg, err := f.awsConfig.Make(blip.AWS{Region: cfg.AWS.Region}, cfg.Hostname)
if err != nil {
return nil, err
}
secret := aws.NewSecret(cfg.AWS.PasswordSecret, awscfg)
parser := f.passwordSecretParser
if parser == nil {
parser = blip.DefaultPasswordSecretParser
}

return func(ctx context.Context) (blip.DbCredentials, error) {
payload, err := secret.GetSecretPayload(ctx)
if err != nil {
return blip.DbCredentials{}, err
}

credentials := blip.DbCredentials{
Username: cfg.Username,
}
if err := parser(ctx, cfg, payload, &credentials); err != nil {
return blip.DbCredentials{}, err
}

return credentials, nil
}, nil
}

Expand Down
Loading
Loading