Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: adds AWS sts support #11481

Merged
merged 22 commits into from Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions operator/CHANGELOG.md
@@ -1,5 +1,6 @@
## Main

- [11481](https://github.com/grafana/loki/pull/11481) **JoaoBraveCoding**: Adds AWS STS support
- [11473](https://github.com/grafana/loki/pull/11473) **JoaoBraveCoding**: Adds structured metadata dashboards
- [11448](https://github.com/grafana/loki/pull/11448) **periklis**: Update Loki operand to v2.9.3
- [11357](https://github.com/grafana/loki/pull/11357) **periklis**: Fix storing authentication credentials in the Loki ConfigMap
Expand Down
38 changes: 26 additions & 12 deletions operator/internal/handlers/internal/storage/secrets.go
Expand Up @@ -126,26 +126,38 @@ func extractGCSConfigSecret(s *corev1.Secret) (*storage.GCSStorageConfig, error)

func extractS3ConfigSecret(s *corev1.Secret) (*storage.S3StorageConfig, error) {
// Extract and validate mandatory fields
endpoint := s.Data[storage.KeyAWSEndpoint]
if len(endpoint) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSEndpoint)
}
buckets := s.Data[storage.KeyAWSBucketNames]
if len(buckets) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSBucketNames)
}

// Fields related with static authentication
endpoint := s.Data[storage.KeyAWSEndpoint]
id := s.Data[storage.KeyAWSAccessKeyID]
if len(id) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSAccessKeyID)
}
secret := s.Data[storage.KeyAWSAccessKeySecret]
if len(secret) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSAccessKeySecret)
}

// Extract and validate optional fields
// Fields related with STS authentication
roleArn := s.Data[storage.KeyAWSRoleArn]
audience := s.Data[storage.KeyAWSAudience]
// Optional fields
region := s.Data[storage.KeyAWSRegion]

if len(roleArn) == 0 {
if len(endpoint) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSEndpoint)
}
if len(id) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSAccessKeyID)
}
if len(secret) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSAccessKeySecret)
}
} else {
// In the STS case region is not an optional field
if len(region) == 0 {
return nil, kverrors.New("missing secret field", "field", storage.KeyAWSRegion)
}
}

sseCfg, err := extractS3SSEConfig(s.Data)
if err != nil {
return nil, err
Expand All @@ -155,6 +167,8 @@ func extractS3ConfigSecret(s *corev1.Secret) (*storage.S3StorageConfig, error) {
Endpoint: string(endpoint),
Buckets: string(buckets),
Region: string(region),
RoleArn: string(roleArn),
JoaoBraveCoding marked this conversation as resolved.
Show resolved Hide resolved
Audience: string(audience),
SSE: sseCfg,
}, nil
}
Expand Down
34 changes: 34 additions & 0 deletions operator/internal/handlers/internal/storage/secrets_test.go
Expand Up @@ -320,6 +320,40 @@ func TestS3Extract(t *testing.T) {
},
},
},
{
name: "STS missing region",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Data: map[string][]byte{
"bucketnames": []byte("this,that"),
"role_arn": []byte("role"),
},
},
wantErr: true,
},
{
name: "STS with region",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Data: map[string][]byte{
"bucketnames": []byte("this,that"),
"role_arn": []byte("role"),
"region": []byte("here"),
},
},
},
{
name: "STS all set",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Data: map[string][]byte{
"bucketnames": []byte("this,that"),
"role_arn": []byte("role"),
"region": []byte("here"),
"audience": []byte("audience"),
},
},
},
}
for _, tst := range table {
tst := tst
Expand Down
2 changes: 1 addition & 1 deletion operator/internal/manifests/compactor.go
Expand Up @@ -26,7 +26,7 @@ func BuildCompactor(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage); err != nil {
if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage, opts.Gates.OpenShift.Enabled); err != nil {
periklis marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion operator/internal/manifests/indexgateway.go
Expand Up @@ -27,7 +27,7 @@ func BuildIndexGateway(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage); err != nil {
if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage, opts.Gates.OpenShift.Enabled); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion operator/internal/manifests/ingester.go
Expand Up @@ -27,7 +27,7 @@ func BuildIngester(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage); err != nil {
if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage, opts.Gates.OpenShift.Enabled); err != nil {
return nil, err
}

Expand Down
196 changes: 196 additions & 0 deletions operator/internal/manifests/internal/config/build_test.go
Expand Up @@ -5418,3 +5418,199 @@ analytics:
})
}
}

func TestBuild_ConfigAndRuntimeConfig_STS_WIF(t *testing.T) {
defaultSchema := []lokiv1.ObjectStorageSchema{
{
Version: lokiv1.ObjectStorageSchemaV11,
EffectiveDate: "2020-10-01",
},
}
for _, tc := range []struct {
name string
objStorageConfig storage.Options
expStorageConfig string
}{
{
name: "aws_sts",
objStorageConfig: storage.Options{
SharedStore: lokiv1.ObjectStorageSecretS3,
S3: &storage.S3StorageConfig{
RoleArn: "my-role-arn",
Region: "my-region",
Buckets: "my-bucket",
},
Schemas: defaultSchema,
},
expStorageConfig: `
s3:
s3: s3://my-region/my-bucket
s3forcepathstyle: false`,
},
} {
JoaoBraveCoding marked this conversation as resolved.
Show resolved Hide resolved
t.Run(tc.name, func(t *testing.T) {
expCfg := `
---
auth_enabled: true
chunk_store_config:
chunk_cache_config:
embedded_cache:
enabled: true
max_size_mb: 500
common:
storage:
${STORAGE_CONFIG}
compactor_grpc_address: loki-compactor-grpc-lokistack-dev.default.svc.cluster.local:9095
ring:
kvstore:
store: memberlist
heartbeat_period: 5s
heartbeat_timeout: 1m
instance_port: 9095
compactor:
compaction_interval: 2h
working_directory: /tmp/loki/compactor
frontend:
tail_proxy_url: http://loki-querier-http-lokistack-dev.default.svc.cluster.local:3100
compress_responses: true
max_outstanding_per_tenant: 4096
log_queries_longer_than: 5s
frontend_worker:
frontend_address: loki-query-frontend-grpc-lokistack-dev.default.svc.cluster.local:9095
grpc_client_config:
max_send_msg_size: 104857600
match_max_concurrent: true
ingester:
chunk_block_size: 262144
chunk_encoding: snappy
chunk_idle_period: 1h
chunk_retain_period: 5m
chunk_target_size: 2097152
flush_op_timeout: 10m
lifecycler:
final_sleep: 0s
join_after: 30s
num_tokens: 512
ring:
replication_factor: 1
max_chunk_age: 2h
max_transfer_retries: 0
wal:
enabled: true
dir: /tmp/wal
replay_memory_ceiling: 2500
ingester_client:
grpc_client_config:
max_recv_msg_size: 67108864
remote_timeout: 1s
# NOTE: Keep the order of keys as in Loki docs
# to enable easy diffs when vendoring newer
# Loki releases.
# (See https://grafana.com/docs/loki/latest/configuration/#limits_config)
#
# Values for not exposed fields are taken from the grafana/loki production
# configuration manifests.
# (See https://github.com/grafana/loki/blob/main/production/ksonnet/loki/config.libsonnet)
limits_config:
ingestion_rate_strategy: global
ingestion_rate_mb: 4
ingestion_burst_size_mb: 6
max_label_name_length: 1024
max_label_value_length: 2048
max_label_names_per_series: 30
reject_old_samples: true
reject_old_samples_max_age: 168h
creation_grace_period: 10m
enforce_metric_name: false
# Keep max_streams_per_user always to 0 to default
# using max_global_streams_per_user always.
# (See https://github.com/grafana/loki/blob/main/pkg/ingester/limiter.go#L73)
max_streams_per_user: 0
max_line_size: 256000
max_entries_limit_per_query: 5000
max_global_streams_per_user: 0
max_chunks_per_query: 2000000
max_query_length: 721h
max_query_parallelism: 32
tsdb_max_query_parallelism: 512
max_query_series: 500
cardinality_limit: 100000
max_streams_matchers_per_query: 1000
max_cache_freshness_per_query: 10m
per_stream_rate_limit: 3MB
per_stream_rate_limit_burst: 15MB
split_queries_by_interval: 30m
query_timeout: 1m
allow_structured_metadata: true
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
bind_port: 7946
join_members:
- loki-gossip-ring-lokistack-dev.default.svc.cluster.local:7946
max_join_backoff: 1m
max_join_retries: 10
min_join_backoff: 1s
querier:
engine:
max_look_back_period: 30s
extra_query_delay: 0s
max_concurrent: 2
query_ingesters_within: 3h
tail_max_duration: 1h
query_range:
align_queries_with_step: true
cache_results: true
max_retries: 5
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 500
parallelise_shardable_queries: true
schema_config:
configs:
- from: "2020-10-01"
index:
period: 24h
prefix: index_
object_store: s3
schema: v11
store: boltdb-shipper
server:
graceful_shutdown_timeout: 5s
grpc_server_min_time_between_pings: '10s'
grpc_server_ping_without_stream_allowed: true
grpc_server_max_concurrent_streams: 1000
grpc_server_max_recv_msg_size: 104857600
grpc_server_max_send_msg_size: 104857600
http_listen_port: 3100
http_server_idle_timeout: 30s
http_server_read_timeout: 30s
http_server_write_timeout: 10m0s
log_level: info
storage_config:
boltdb_shipper:
active_index_directory: /tmp/loki/index
cache_location: /tmp/loki/index_cache
cache_ttl: 24h
resync_interval: 5m
shared_store: s3
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095
tracing:
enabled: false
analytics:
reporting_enabled: true
`
expCfg = strings.Replace(expCfg, "${STORAGE_CONFIG}", tc.expStorageConfig, 1)

opts := defaultOptions()
opts.ObjectStorage = tc.objStorageConfig

cfg, _, err := Build(opts)
require.NoError(t, err)
require.YAMLEq(t, expCfg, string(cfg))
})
}
}
Expand Up @@ -24,11 +24,17 @@ common:
{{- end }}
{{- with .ObjectStorage.S3 }}
s3:
{{- if .RoleArn }}
s3: "s3://{{.Region}}/{{.Buckets}}"
s3forcepathstyle: false
{{- else }}
s3: {{ .Endpoint }}
bucketnames: {{ .Buckets }}
region: {{ .Region }}
access_key_id: ${AWS_ACCESS_KEY_ID}
secret_access_key: ${AWS_ACCESS_KEY_SECRET}
s3forcepathstyle: true
{{- end }}
{{- with .SSE }}
{{- if .Type }}
sse:
Expand All @@ -42,7 +48,6 @@ common:
{{- end}}
{{- end }}
{{- end }}
s3forcepathstyle: true
{{- end }}
{{- with .ObjectStorage.Swift }}
swift:
Expand Down
2 changes: 1 addition & 1 deletion operator/internal/manifests/querier.go
Expand Up @@ -27,7 +27,7 @@ func BuildQuerier(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureDeployment(deployment, opts.ObjectStorage); err != nil {
if err := storage.ConfigureDeployment(deployment, opts.ObjectStorage, opts.Gates.OpenShift.Enabled); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion operator/internal/manifests/ruler.go
Expand Up @@ -29,7 +29,7 @@ func BuildRuler(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage); err != nil {
if err := storage.ConfigureStatefulSet(statefulSet, opts.ObjectStorage, opts.Gates.OpenShift.Enabled); err != nil {
return nil, err
}

Expand Down