Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
421 changes: 421 additions & 0 deletions config/crd/bases/postgres-operator.crunchydata.com_pgadmins.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

27 changes: 19 additions & 8 deletions internal/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ package collector
import (
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/yaml"

"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

// ComponentID represents a component identifier within an OpenTelemetry
// Collector YAML configuration. Each value is a "type" followed by an optional
// slash-then-name: `type[/name]`
type ComponentID string
type ComponentID = string

// PipelineID represents a pipeline identifier within an OpenTelemetry Collector
// YAML configuration. Each value is a signal followed by an optional
// slash-then-name: `signal[/name]`
type PipelineID = string

// Config represents an OpenTelemetry Collector YAML configuration.
// See: https://opentelemetry.io/docs/collector/configuration
Expand All @@ -35,11 +42,6 @@ type Pipeline struct {
Receivers []ComponentID
}

// PipelineID represents a pipeline identifier within an OpenTelemetry Collector
// YAML configuration. Each value is a signal followed by an optional
// slash-then-name: `signal[/name]`
type PipelineID string

func (c *Config) ToYAML() (string, error) {
const yamlGeneratedWarning = "" +
"# Generated by postgres-operator. DO NOT EDIT.\n" +
Expand Down Expand Up @@ -71,8 +73,8 @@ func (c *Config) ToYAML() (string, error) {
}

// NewConfig creates a base config for an OTel collector container
func NewConfig() *Config {
return &Config{
func NewConfig(spec *v1beta1.InstrumentationSpec) *Config {
config := &Config{
Exporters: map[ComponentID]any{
// TODO: Do we want a DebugExporter outside of development?
// https://pkg.go.dev/go.opentelemetry.io/collector/exporter/debugexporter#section-readme
Expand All @@ -90,4 +92,13 @@ func NewConfig() *Config {
Receivers: map[ComponentID]any{},
Pipelines: map[PipelineID]Pipeline{},
}

// If there are exporters defined in the spec, add them to the config.
if spec != nil && spec.Config != nil && spec.Config.Exporters != nil {
for k, v := range spec.Config.Exporters {
config.Exporters[k] = v
}
}

return config
}
36 changes: 33 additions & 3 deletions internal/collector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
)

func TestConfigToYAML(t *testing.T) {
result, err := NewConfig().ToYAML()
assert.NilError(t, err)
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
t.Run("NilInstrumentationSpec", func(t *testing.T) {
result, err := NewConfig(nil).ToYAML()
assert.NilError(t, err)
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
debug:
Expand All @@ -30,4 +31,33 @@ service:
extensions: []
pipelines: {}
`)
})

t.Run("InstrumentationSpecDefined", func(t *testing.T) {
spec := testInstrumentationSpec()

result, err := NewConfig(spec).ToYAML()
assert.NilError(t, err)
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
debug:
verbosity: detailed
googlecloud:
log:
default_log_name: opentelemetry.io/collector-exported-log
project: google-project-name
extensions: {}
processors:
batch/1s:
timeout: 1s
batch/200ms:
timeout: 200ms
groupbyattrs/compact: {}
receivers: {}
service:
extensions: []
pipelines: {}
`)
})
}
29 changes: 29 additions & 0 deletions internal/collector/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package collector

import (
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

func testInstrumentationSpec() *v1beta1.InstrumentationSpec {
spec := v1beta1.InstrumentationSpec{
Config: &v1beta1.InstrumentationConfigSpec{
Exporters: map[string]any{
"googlecloud": map[string]any{
"log": map[string]any{
"default_log_name": "opentelemetry.io/collector-exported-log",
},
"project": "google-project-name",
},
},
},
Logs: &v1beta1.InstrumentationLogsSpec{
Exporters: []string{"googlecloud"},
},
}

return spec.DeepCopy()
}
7 changes: 7 additions & 0 deletions internal/collector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

// AddToConfigMap populates the shared ConfigMap with fields needed to run the Collector.
Expand All @@ -33,6 +34,7 @@ func AddToConfigMap(
// AddToPod adds the OpenTelemetry collector container to a given Pod
func AddToPod(
ctx context.Context,
spec *v1beta1.InstrumentationSpec,
pullPolicy corev1.PullPolicy,
inInstanceConfigMap *corev1.ConfigMap,
outPod *corev1.PodSpec,
Expand Down Expand Up @@ -63,6 +65,11 @@ func AddToPod(
}},
}

// If the user has specified files to be mounted in the spec, add them to the projected config volume
if spec != nil && spec.Config != nil && spec.Config.Files != nil {
configVolume.Projected.Sources = append(configVolume.Projected.Sources, spec.Config.Files...)
}

container := corev1.Container{
Name: naming.ContainerCollector,
Image: "ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.117.0",
Expand Down
29 changes: 28 additions & 1 deletion internal/collector/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func EnablePatroniLogging(ctx context.Context,
},
}

// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/resourceprocessor#readme
outConfig.Processors["resource/patroni"] = map[string]any{
"attributes": []map[string]any{
// Container and Namespace names need no escaping because they are DNS labels.
// Pod names need no escaping because they are DNS subdomains.
//
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/resource/k8s.md
{"action": "insert", "key": "k8s.container.name", "value": naming.ContainerDatabase},
{"action": "insert", "key": "k8s.namespace.name", "value": "${env:K8S_POD_NAMESPACE}"},
{"action": "insert", "key": "k8s.pod.name", "value": "${env:K8S_POD_NAME}"},
},
}

// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
outConfig.Processors["transform/patroni_logs"] = map[string]any{
"log_statements": []map[string]any{{
Expand Down Expand Up @@ -86,14 +100,27 @@ func EnablePatroniLogging(ctx context.Context,
}},
}

// If there are exporters to be added to the logs pipelines defined in
// the spec, add them to the pipeline. Otherwise, add the DebugExporter.
var exporters []ComponentID
if inCluster.Spec.Instrumentation != nil &&
inCluster.Spec.Instrumentation.Logs != nil &&
inCluster.Spec.Instrumentation.Logs.Exporters != nil {
exporters = inCluster.Spec.Instrumentation.Logs.Exporters
} else {
exporters = []ComponentID{DebugExporter}
}

outConfig.Pipelines["logs/patroni"] = Pipeline{
Extensions: []ComponentID{"file_storage/patroni_logs"},
Receivers: []ComponentID{"filelog/patroni_jsonlog"},
Processors: []ComponentID{
"resource/patroni",
"transform/patroni_logs",
SubSecondBatchProcessor,
CompactingProcessor,
},
Exporters: []ComponentID{DebugExporter},
Exporters: exporters,
}
}
}
Expand Down
104 changes: 102 additions & 2 deletions internal/collector/patroni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
)

func TestEnablePatroniLogging(t *testing.T) {
t.Run("Enabled", func(t *testing.T) {
t.Run("NilInstrumentationSpec", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))
ctx := feature.NewContext(context.Background(), gate)

config := NewConfig()
config := NewConfig(nil)

EnablePatroniLogging(ctx, new(v1beta1.PostgresCluster), config)

Expand All @@ -44,6 +44,17 @@ processors:
batch/200ms:
timeout: 200ms
groupbyattrs/compact: {}
resource/patroni:
attributes:
- action: insert
key: k8s.container.name
value: database
- action: insert
key: k8s.namespace.name
value: ${env:K8S_POD_NAMESPACE}
- action: insert
key: k8s.pod.name
value: ${env:K8S_POD_NAME}
transform/patroni_logs:
log_statements:
- context: log
Expand Down Expand Up @@ -76,8 +87,97 @@ service:
exporters:
- debug
processors:
- resource/patroni
- transform/patroni_logs
- batch/200ms
- groupbyattrs/compact
receivers:
- filelog/patroni_jsonlog
`)
})

t.Run("InstrumentationSpecDefined", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))
ctx := feature.NewContext(context.Background(), gate)

cluster := new(v1beta1.PostgresCluster)
cluster.Spec.Instrumentation = testInstrumentationSpec()
config := NewConfig(cluster.Spec.Instrumentation)

EnablePatroniLogging(ctx, cluster, config)

result, err := config.ToYAML()
assert.NilError(t, err)
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
debug:
verbosity: detailed
googlecloud:
log:
default_log_name: opentelemetry.io/collector-exported-log
project: google-project-name
extensions:
file_storage/patroni_logs:
create_directory: true
directory: /pgdata/patroni/log/receiver
fsync: true
processors:
batch/1s:
timeout: 1s
batch/200ms:
timeout: 200ms
groupbyattrs/compact: {}
resource/patroni:
attributes:
- action: insert
key: k8s.container.name
value: database
- action: insert
key: k8s.namespace.name
value: ${env:K8S_POD_NAMESPACE}
- action: insert
key: k8s.pod.name
value: ${env:K8S_POD_NAME}
transform/patroni_logs:
log_statements:
- context: log
statements:
- set(instrumentation_scope.name, "patroni")
- set(cache, ParseJSON(body["original"]))
- set(severity_text, cache["levelname"])
- set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "DEBUG"
- set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "INFO"
- set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARNING"
- set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"
- set(severity_number, SEVERITY_NUMBER_FATAL) where severity_text == "CRITICAL"
- set(time, Time(cache["asctime"], "%F %T,%L"))
- set(attributes["log.record.original"], body["original"])
- set(body, cache["message"])
receivers:
filelog/patroni_jsonlog:
include:
- /pgdata/patroni/log/*.log
operators:
- from: body
to: body.original
type: move
storage: file_storage/patroni_logs
service:
extensions:
- file_storage/patroni_logs
pipelines:
logs/patroni:
exporters:
- googlecloud
processors:
- resource/patroni
- transform/patroni_logs
- batch/200ms
- groupbyattrs/compact
receivers:
- filelog/patroni_jsonlog
`)
Expand Down
Loading
Loading