/
aws.go
35 lines (29 loc) · 857 Bytes
/
aws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package aws
import (
"context"
"github.com/benthosdev/benthos/v4/internal/impl/kafka"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/twmb/franz-go/pkg/sasl"
kaws "github.com/twmb/franz-go/pkg/sasl/aws"
sess "github.com/benthosdev/benthos/v4/internal/impl/aws"
)
func init() {
kafka.AWSSASLFromConfigFn = func(c *service.ParsedConfig) (sasl.Mechanism, error) {
awsSession, err := sess.GetSession(c.Namespace("aws"))
if err != nil {
return nil, err
}
creds := awsSession.Config.Credentials
return kaws.ManagedStreamingIAM(func(ctx context.Context) (kaws.Auth, error) {
val, err := creds.GetWithContext(ctx)
if err != nil {
return kaws.Auth{}, err
}
return kaws.Auth{
AccessKey: val.AccessKeyID,
SecretKey: val.SecretAccessKey,
SessionToken: val.SessionToken,
}, nil
}), nil
}
}