Skip to content

Commit 78d71d3

Browse files
authored
[libbeat] Cloud Connectors AWS implementation (#47587)
As discussed [here](elastic/security-team#13112), this PR introduces the cloud connectors flow into libbeat to enable its use in agentless AWS integrations (in combination with the new aws [auth method](#47260)). Cloud Connectors flow in high level: <img width="1391" height="563" alt="Screenshot 2025-11-12 at 10 49 01 AM" src="https://github.com/user-attachments/assets/2d0855da-e259-47d4-b294-3f755f078c23" /> * User creates (through a cloud formation provided by Elastic) a role that has all the necessary permissions required by the integration functionality that trusts Elastic Global Role to be assumed by. (The cloud formation also creates a unique ID as an external ID that the user adds to the package policy.) * Elastic Global Role trusts/federates with the agnentless OIDC Issuer ([doc](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_providers_create_oidc.html)), which means id token created from that issuer can assume the Elastic Global Role. * Agentless OIDC Issuer provides each agentless pod with an ID token. * User creates an agentless agent and provides the Remote Role ARN and the External ID to the package policy (external id is treated as secret). * Beat performs a [role chaining](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html#iam-term-role-chaining) to the Remote Role through the Elastic Global Role (described below). * During the last role-assume (of the user's remote role) the [source identity](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html#:~:text=Required%3A%20No-,SourceIdentity,-The%20source%20identity) is set to the elastic cloud resource id. This should be expected in the remote role trust policy as a condition, and it ensures that, in order for the role-assume to be done, both user input (role arn, external id) and system-provided information (resource id) are correct. Example remote role trust policy condition: ```json "Condition": { "StringEquals": { "sts:ExternalId": "randomly-generated-in-customer-csp-treated-as-secret", "sts:SourceIdentity": "elastic resouce id (like project id in serverless)" } } ``` ## Proposed commit message A new boolean flag was introduced in the ConfigAWS module (`x-pack/libbeat/common/aws/credentials.go`) to indicate whether cloud connectors are supported. When this setting is set to `true`, the initialization of the aws credentials follows this chain: 1. Assume the Elastic Global Role [with web identity](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html) using the ID token that the oidc issuer has provided to the pod. The Elastic Global Role ARN and the ID token file path, are set by agentless controller as an env vars in agentless agents pods. 2. [Assumes](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) the Remote Role using the role arn and the external id provided by the package policy (and thus the user).
1 parent cdb243c commit 78d71d3

File tree

5 files changed

+376
-2
lines changed

5 files changed

+376
-2
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: feature
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Introduce cloud connectors flow.
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: "all"
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package aws
6+
7+
import (
8+
"errors"
9+
"fmt"
10+
"os"
11+
"time"
12+
13+
awssdk "github.com/aws/aws-sdk-go-v2/aws"
14+
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
15+
"github.com/aws/aws-sdk-go-v2/service/sts"
16+
17+
"github.com/elastic/elastic-agent-libs/logp"
18+
)
19+
20+
// These env vars are provided by agentless controller when the cloud connectors flow is enabled.
21+
const (
22+
CloudConnectorsGlobalRoleEnvVar = "CLOUD_CONNECTORS_GLOBAL_ROLE"
23+
CloudConnectorsJWTPathEnvVar = "CLOUD_CONNECTORS_ID_TOKEN_FILE"
24+
CloudConnectorsCloudResourceIDEnvVar = "CLOUD_RESOURCE_ID"
25+
)
26+
27+
// CloudConnectorsConfig is the config for the cloud connectors flow
28+
type CloudConnectorsConfig struct {
29+
ElasticGlobalRoleARN string
30+
IDTokenPath string
31+
CloudResourceID string
32+
}
33+
34+
func parseCloudConnectorsConfigFromEnv() (CloudConnectorsConfig, error) {
35+
cc := CloudConnectorsConfig{
36+
ElasticGlobalRoleARN: os.Getenv(CloudConnectorsGlobalRoleEnvVar),
37+
IDTokenPath: os.Getenv(CloudConnectorsJWTPathEnvVar),
38+
CloudResourceID: os.Getenv(CloudConnectorsCloudResourceIDEnvVar),
39+
}
40+
41+
var errs []error
42+
43+
if cc.ElasticGlobalRoleARN == "" {
44+
errs = append(errs, errors.New("elastic global role arn is not configured"))
45+
}
46+
if cc.IDTokenPath == "" {
47+
errs = append(errs, errors.New("id token path is not configured"))
48+
}
49+
if cc.CloudResourceID == "" {
50+
errs = append(errs, errors.New("cloud resource id is not configured"))
51+
}
52+
53+
if len(errs) > 0 {
54+
return CloudConnectorsConfig{}, fmt.Errorf("cloud connectors config is invalid: %w", errors.Join(errs...))
55+
}
56+
57+
return cc, nil
58+
}
59+
60+
const defaultIntermediateDuration = 20 * time.Minute
61+
62+
func addCloudConnectorsCredentials(config ConfigAWS, cloudConnectorsConfig CloudConnectorsConfig, awsConfig *awssdk.Config, logger *logp.Logger) {
63+
logger = logger.Named("addCloudConnectorsCredentials")
64+
logger.Debug("Switching credentials provider to Cloud Connectors")
65+
66+
addCredentialsChain(
67+
awsConfig,
68+
69+
// Step 1: Assume the Elastic Global Role with web identity using the ID token provided by the agentless OIDC issuer.
70+
func(c awssdk.Config) awssdk.CredentialsProvider {
71+
provider := stscreds.NewWebIdentityRoleProvider(
72+
sts.NewFromConfig(c), // client uses credentials from previous config.
73+
cloudConnectorsConfig.ElasticGlobalRoleARN,
74+
stscreds.IdentityTokenFile(cloudConnectorsConfig.IDTokenPath),
75+
func(opt *stscreds.WebIdentityRoleOptions) {
76+
opt.Duration = defaultIntermediateDuration
77+
},
78+
)
79+
return awssdk.NewCredentialsCache(provider)
80+
},
81+
82+
// Step 2: Assume the remote role (the user's configured role), using the previously assumed role in the chain.
83+
func(c awssdk.Config) awssdk.CredentialsProvider {
84+
assumeRoleProvider := stscreds.NewAssumeRoleProvider(
85+
sts.NewFromConfig(c), // client uses credentials from previous config.
86+
config.RoleArn,
87+
func(aro *stscreds.AssumeRoleOptions) {
88+
aro.Duration = config.AssumeRoleDuration
89+
if config.ExternalID != "" {
90+
aro.ExternalID = awssdk.String(config.ExternalID)
91+
92+
// The source identity is set by the system (env var) rather than user input (package policy).
93+
// It should be required by the remote role (the role to assume) as a condition for assuming it.
94+
aro.SourceIdentity = awssdk.String(cloudConnectorsConfig.CloudResourceID)
95+
}
96+
},
97+
)
98+
return awssdk.NewCredentialsCache(assumeRoleProvider, func(options *awssdk.CredentialsCacheOptions) {
99+
if config.AssumeRoleExpiryWindow > 0 {
100+
options.ExpiryWindow = config.AssumeRoleExpiryWindow
101+
}
102+
})
103+
},
104+
)
105+
}
106+
107+
func addCredentialsChain(awsConfig *awssdk.Config, chain ...func(awssdk.Config) awssdk.CredentialsProvider) {
108+
for _, fn := range chain {
109+
awsConfig.Credentials = fn(*awsConfig)
110+
}
111+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package aws
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"io"
11+
"net/url"
12+
"os"
13+
"path"
14+
"testing"
15+
"time"
16+
17+
"github.com/aws/aws-sdk-go-v2/aws"
18+
"github.com/aws/aws-sdk-go-v2/service/sts"
19+
"github.com/aws/aws-sdk-go-v2/service/sts/types"
20+
"github.com/aws/smithy-go/middleware"
21+
smithyhttp "github.com/aws/smithy-go/transport/http"
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"github.com/elastic/elastic-agent-libs/logp/logptest"
26+
)
27+
28+
func TestAddCloudConnectorsCredentials(t *testing.T) {
29+
config := ConfigAWS{
30+
RoleArn: "arn:aws:iam::123456789012:role/customer-role",
31+
ExternalID: "external-id-456",
32+
AssumeRoleDuration: 2 * time.Hour,
33+
AssumeRoleExpiryWindow: 10 * time.Minute,
34+
}
35+
cloudConnectorsConfig := CloudConnectorsConfig{
36+
ElasticGlobalRoleARN: "arn:aws:iam::999999999999:role/elastic-global-role",
37+
CloudResourceID: "abcd1234",
38+
}
39+
tokenFileContent := "abc123"
40+
41+
tmpDir := t.TempDir()
42+
pth := path.Join(tmpDir, "id_token")
43+
_ = os.WriteFile(path.Join(tmpDir, "id_token"), []byte(tokenFileContent), 0o644)
44+
cloudConnectorsConfig.IDTokenPath = pth
45+
46+
// Create a base AWS config
47+
awsConfig := &aws.Config{
48+
Region: "us-east-1",
49+
BaseEndpoint: aws.String("https://aws.mock"),
50+
}
51+
52+
// Create a test logger
53+
logger := logptest.NewTestingLogger(t, "")
54+
55+
// mock responses
56+
receivedCalls := 0
57+
awsConfig.APIOptions = append(awsConfig.APIOptions, func(stack *middleware.Stack) error {
58+
return stack.Finalize.Add(
59+
middleware.FinalizeMiddlewareFunc(
60+
"mock",
61+
func(ctx context.Context, in middleware.FinalizeInput, next middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
62+
req, is := in.Request.(*smithyhttp.Request)
63+
require.Truef(t, is, "request expected to be of type *smithyhttp.Request, got: %T", in.Request)
64+
receivedCalls++
65+
bd, err := io.ReadAll(req.GetStream())
66+
assert.NoError(t, req.RewindStream())
67+
assert.NoError(t, err)
68+
body := string(bd)
69+
70+
switch receivedCalls {
71+
72+
// Expect the first request to be AssumeRoleWithWebIdentity
73+
case 1:
74+
q, err := url.ParseQuery(body)
75+
assert.NoError(t, err)
76+
assert.Equal(t, "AssumeRoleWithWebIdentity", q.Get("Action"))
77+
assert.Equal(t, "1200", q.Get("DurationSeconds"))
78+
assert.Equal(t, cloudConnectorsConfig.ElasticGlobalRoleARN, q.Get("RoleArn"))
79+
assert.Equal(t, tokenFileContent, q.Get("WebIdentityToken"))
80+
return middleware.FinalizeOutput{
81+
Result: &sts.AssumeRoleWithWebIdentityOutput{
82+
Credentials: &types.Credentials{
83+
AccessKeyId: aws.String("AKIAFAKEEXAMPLE00001"),
84+
SecretAccessKey: aws.String("FAKEwJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY1"),
85+
SessionToken: aws.String("FwoGZXIvYXdzEFAaDFAKESESSIONTOKENEXAMPLE1"),
86+
Expiration: aws.Time(time.Now().Add(defaultIntermediateDuration)),
87+
},
88+
},
89+
}, middleware.Metadata{}, nil
90+
91+
// Expect the second request to be AssumeRole
92+
case 2:
93+
q, err := url.ParseQuery(body)
94+
assert.NoError(t, err)
95+
assert.Equal(t, "AssumeRole", q.Get("Action"))
96+
assert.Equal(t, "7200", q.Get("DurationSeconds"))
97+
assert.Equal(t, config.ExternalID, q.Get("ExternalId"))
98+
assert.Equal(t, config.RoleArn, q.Get("RoleArn"))
99+
assert.Equal(t, "abcd1234", q.Get("SourceIdentity"))
100+
return middleware.FinalizeOutput{
101+
Result: &sts.AssumeRoleOutput{
102+
Credentials: &types.Credentials{
103+
AccessKeyId: aws.String("AKIAFAKEEXAMPLE00002"),
104+
SecretAccessKey: aws.String("FAKEwJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY2"),
105+
SessionToken: aws.String("FwoGZXIvYXdzEFAaDFAKESESSIONTOKENEXAMPLE2"),
106+
Expiration: aws.Time(time.Now().Add(defaultIntermediateDuration)),
107+
},
108+
},
109+
}, middleware.Metadata{}, nil
110+
111+
default:
112+
t.Fatal("unexpected aws sdk call")
113+
return middleware.FinalizeOutput{}, middleware.Metadata{}, fmt.Errorf("unexpected operation")
114+
}
115+
},
116+
),
117+
middleware.After,
118+
)
119+
})
120+
121+
// Call the function under test
122+
addCloudConnectorsCredentials(
123+
config,
124+
cloudConnectorsConfig,
125+
awsConfig,
126+
logger,
127+
)
128+
129+
// Verify that credentials provider was set
130+
require.NotNil(t, awsConfig.Credentials, "credentials provider should be set")
131+
132+
crd, err := awsConfig.Credentials.Retrieve(t.Context())
133+
require.NoError(t, err)
134+
require.NotNil(t, crd)
135+
require.Equal(t, 2, receivedCalls)
136+
}
137+
138+
func TestParseCloudConnectorsConfigFromEnv(t *testing.T) {
139+
t.Run("happy_path", func(t *testing.T) {
140+
t.Setenv(CloudConnectorsGlobalRoleEnvVar, "arn:aws:iam::999999999999:role/elastic-global-role")
141+
t.Setenv(CloudConnectorsJWTPathEnvVar, "/path/token")
142+
t.Setenv(CloudConnectorsCloudResourceIDEnvVar, "abc123")
143+
144+
got, err := parseCloudConnectorsConfigFromEnv()
145+
146+
require.NoError(t, err)
147+
148+
assert.Equal(
149+
t,
150+
CloudConnectorsConfig{
151+
ElasticGlobalRoleARN: "arn:aws:iam::999999999999:role/elastic-global-role",
152+
IDTokenPath: "/path/token",
153+
CloudResourceID: "abc123",
154+
},
155+
got,
156+
)
157+
})
158+
159+
t.Run("missing config single", func(t *testing.T) {
160+
t.Setenv(CloudConnectorsGlobalRoleEnvVar, "arn:aws:iam::999999999999:role/elastic-global-role")
161+
t.Setenv(CloudConnectorsJWTPathEnvVar, "/path/token")
162+
163+
got, err := parseCloudConnectorsConfigFromEnv()
164+
165+
require.ErrorContains(t, err, "cloud resource id")
166+
assert.Equal(t, CloudConnectorsConfig{}, got)
167+
})
168+
169+
t.Run("missing config all", func(t *testing.T) {
170+
got, err := parseCloudConnectorsConfigFromEnv()
171+
172+
require.ErrorContains(t, err, "elastic global role")
173+
require.ErrorContains(t, err, "id token")
174+
require.ErrorContains(t, err, "cloud resource id")
175+
assert.Equal(t, CloudConnectorsConfig{}, got)
176+
})
177+
}

x-pack/libbeat/common/aws/credentials.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ type ConfigAWS struct {
5252
// AssumeRoleExpiryWindow will allow the credentials to trigger refreshing prior to the credentials
5353
// actually expiring. If expiry_window is less than or equal to zero, the setting is ignored.
5454
AssumeRoleExpiryWindow time.Duration `config:"assume_role.expiry_window"`
55+
56+
// UseCloudConnectors indicates whether the cloud connectors flow is used.
57+
// If this is true, the InitializeAWSConfig should initialize the AWS cloud connector role chaining flow.
58+
UseCloudConnectors bool `config:"use_cloud_connectors"`
5559
}
5660

5761
// InitializeAWSConfig function creates the awssdk.Config object from the provided config
@@ -66,10 +70,19 @@ func InitializeAWSConfig(beatsConfig ConfigAWS, logger *logp.Logger) (awssdk.Con
6670
}
6771

6872
// Assume IAM role if iam_role config parameter is given
69-
if beatsConfig.RoleArn != "" {
73+
if beatsConfig.RoleArn != "" && !beatsConfig.UseCloudConnectors {
7074
addAssumeRoleProviderToAwsConfig(beatsConfig, &awsConfig, logger)
7175
}
7276

77+
// If cloud connectors method is selected from config, initialize the role chaining.
78+
if beatsConfig.UseCloudConnectors {
79+
cloudConnectorsConfig, err := parseCloudConnectorsConfigFromEnv()
80+
if err != nil {
81+
return awsConfig, err
82+
}
83+
addCloudConnectorsCredentials(beatsConfig, cloudConnectorsConfig, &awsConfig, logger)
84+
}
85+
7386
var proxy func(*http.Request) (*url.URL, error)
7487
if beatsConfig.ProxyUrl != "" {
7588
proxyUrl, err := httpcommon.NewProxyURIFromString(beatsConfig.ProxyUrl)
@@ -142,7 +155,11 @@ func getConfigSharedCredentialProfile(beatsConfig ConfigAWS, logger *logp.Logger
142155
return cfg, fmt.Errorf("awsConfig.LoadDefaultConfig failed with shared credential profile given: [%w]", err)
143156
}
144157

145-
logger.Debug("Using shared credential profile for AWS credential")
158+
if beatsConfig.ProfileName != "" || beatsConfig.SharedCredentialFile != "" {
159+
logger.Debug("Using shared credential profile for AWS credential")
160+
} else {
161+
logger.Debug("Using default config for AWS")
162+
}
146163
return cfg, nil
147164
}
148165

0 commit comments

Comments
 (0)